Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch Consumer breaks when Consumer Timeout is reached #5387

Closed
Sirozha1337 opened this issue Aug 6, 2024 · 5 comments
Closed

Batch Consumer breaks when Consumer Timeout is reached #5387

Sirozha1337 opened this issue Aug 6, 2024 · 5 comments
Labels

Comments

@Sirozha1337
Copy link

Contact Details

No response

Version

8.x (develop, pre-release), 8.x

On which operating system(s) are you experiencing the issue?

Windows

Using which broker(s) did you encounter the issue?

RabbitMQ

What are the steps required to reproduce the issue?

1. Set consumer_timeout in RabbitMQ settings to 1.5 minutes
2. Create Batch consumer with TimeLimit 2 minutes and MessageLimit 10
3. Generate 9 messages
4. Wait
5. The consumer will throw errors and will process those messages multiple times

What is the expected behavior?

  1. Maybe a warning or even an error at start up saying that you can't have "consumer_timeout" less than "TimeLimit" for a Batch consumer.
  2. Another possible way, override either consumer_timeout with TimeLimit or TimeLimit with consumer timeout, so that it works properly
  3. Maybe after recovering the connection and processing messages, it should actually acknowledge them. Currently it throws timeout error, but it's not possible that it hit the 1.5 minute timeout, since the connection was just restarted and my processing function is just logging text to console.

What actually happened?

Connection is broken due to timeout, then it is restored and messages are processed right away, but they are not acknowledged. After a few more minutes connection is broken once again, then it recovers and again starts processing same messages without acknowledging them.

Related log output, including any exceptions

11:02:39.237-D Starting bus: rabbitmq://localhost/
11:02:39.318-D Connect: guest@localhost:5672/
11:02:39.477-D Connected: guest@localhost:5672/ (address: amqp://localhost:5672, local: 57287)
11:02:39.556-D Endpoint Ready: rabbitmq://localhost/LAPTOP6B26JPGF_RabbitConsumer_bus_ptqyyygdgcre6oj1bdqmm7wffs?temporary=true
11:02:39.595-D Declare queue: name: TextMessage, durable, x-consumer-timeout: 10000, consumer-count: 0 message-count: 0
11:02:39.625-D Declare exchange: name: TextMessage, type: fanout, durable
11:02:39.637-D Bind queue: source: TextMessage, destination: TextMessage
11:02:39.735-D Consumer Ok: rabbitmq://localhost/TextMessage - amq.ctag-c6Ccabqsa_t5luyYZDPKwQ
11:02:39.736-D Endpoint Ready: rabbitmq://localhost/TextMessage
11:02:39.741-I Bus started: rabbitmq://localhost/
Listening for messages. Hit <return> to quit.
11:03:53.859-D Consumer Model Shutdown: rabbitmq://localhost/TextMessage - amq.ctag-c6Ccabqsa_t5luyYZDPKwQ, Concurrent Peak: 6, 406-PRECONDITION_FAILED - delivery acknowledgement on channel 2 timed out. Timeout value used: 10000 ms. This timeout value can be configured, see consumers doc guide to learn more
11:03:53.870-D Consumer Stopping: rabbitmq://localhost/TextMessage (Consume Loop Exited)
Processing batch of 6 messages
Got message: 8/6/2024 9:04:53 AM High Priority 1 High
Got message: 8/6/2024 9:04:53 AM High Priority 2 High
Got message: 8/6/2024 9:04:53 AM High Priority 3 High
Got message: 8/6/2024 9:04:53 AM High Priority 4 High
Got message: 8/6/2024 9:04:53 AM High Priority 5 High
Got message: 8/6/2024 9:04:53 AM High Priority 6 High
11:04:54.013-D RECEIVE rabbitmq://localhost/TextMessage 2c730000-c333-088f-6a24-08dcb5f68e97 MassTransit.Batch<Messages.TextMessage> MyHighTextMessageConsumer(00:00:00.0046474)
11:04:54.052-D RECEIVE rabbitmq://localhost/TextMessage 2c730000-c333-088f-924c-08dcb5f68e7b Messages.TextMessage MassTransit.Batching.BatchConsumer<Messages.TextMessage>(00:02:00.1097546)
11:04:54.052-D RECEIVE rabbitmq://localhost/TextMessage 2c730000-c333-088f-6792-08dcb5f68e97 Messages.TextMessage MassTransit.Batching.BatchConsumer<Messages.TextMessage>(00:02:00.1097765)
11:04:54.052-D RECEIVE rabbitmq://localhost/TextMessage 2c730000-c333-088f-5e5a-08dcb5f68e97 Messages.TextMessage MassTransit.Batching.BatchConsumer<Messages.TextMessage>(00:02:00.1097921)
11:04:54.052-D RECEIVE rabbitmq://localhost/TextMessage 2c730000-c333-088f-6a24-08dcb5f68e97 Messages.TextMessage MassTransit.Batching.BatchConsumer<Messages.TextMessage>(00:02:00.1097895)
11:04:54.052-D RECEIVE rabbitmq://localhost/TextMessage 2c730000-c333-088f-1ada-08dcb5f68e6d Messages.TextMessage MassTransit.Batching.BatchConsumer<Messages.TextMessage>(00:02:00.1097337)
11:04:54.052-D RECEIVE rabbitmq://localhost/TextMessage 2c730000-c333-088f-9769-08dcb5f68e7b Messages.TextMessage MassTransit.Batching.BatchConsumer<Messages.TextMessage>(00:02:00.1097292)
11:04:54.819-D Endpoint Completed: rabbitmq://localhost/TextMessage
11:04:54.822-D Consumer Completed: rabbitmq://localhost/TextMessage: 6 received, 6 concurrent, amq.ctag-c6Ccabqsa_t5luyYZDPKwQ
11:04:54.819-E T-FAULT rabbitmq://localhost/TextMessage 2c730000-c333-088f-924c-08dcb5f68e7b

RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=491, text='Channel is already closed: AMQP close-reason, initiated by Peer, code=406, text='PRECONDITION_FAILED - delivery acknowledgement on channel 2 timed out. Timeout value used: 10000 ms. This timeout value can be configured, see consumers doc guide to learn more', classId=0, methodId=0', classId=0, methodId=0
   at MassTransit.RabbitMqTransport.RabbitMqReceiveLockContext.Complete() in /_/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/RabbitMqReceiveLockContext.cs:line 24
   at MassTransit.Transports.PendingReceiveLockContext.Execute(Func`2 action) in /_/src/MassTransit/Transports/PendingReceiveLockContext.cs:line 67
   at MassTransit.Transports.PendingReceiveLockContext.Execute(Func`2 action) in /_/src/MassTransit/Transports/PendingReceiveLockContext.cs:line 79
   at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock) in /_/src/MassTransit/Transports/ReceivePipeDispatcher.cs:line 71
   at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock) in /_/src/MassTransit/Transports/ReceivePipeDispatcher.cs:line 108
   at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock) in /_/src/MassTransit/Transports/ReceivePipeDispatcher.cs:line 115
   at MassTransit.RabbitMqTransport.RabbitMqBasicConsumer.<>c__DisplayClass21_0.<<HandleBasicDeliver>b__0>d.MoveNext() in /_/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/RabbitMqBasicConsumer.cs:line 168
11:04:54.819-E T-FAULT rabbitmq://localhost/TextMessage 2c730000-c333-088f-6792-08dcb5f68e97

RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=491, text='Channel is already closed: AMQP close-reason, initiated by Peer, code=406, text='PRECONDITION_FAILED - delivery acknowledgement on channel 2 timed out. Timeout value used: 10000 ms. This timeout value can be configured, see consumers doc guide to learn more', classId=0, methodId=0', classId=0, methodId=0
   at MassTransit.RabbitMqTransport.RabbitMqReceiveLockContext.Complete() in /_/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/RabbitMqReceiveLockContext.cs:line 24
   at MassTransit.Transports.PendingReceiveLockContext.Execute(Func`2 action) in /_/src/MassTransit/Transports/PendingReceiveLockContext.cs:line 67
   at MassTransit.Transports.PendingReceiveLockContext.Execute(Func`2 action) in /_/src/MassTransit/Transports/PendingReceiveLockContext.cs:line 79
   at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock) in /_/src/MassTransit/Transports/ReceivePipeDispatcher.cs:line 71
   at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock) in /_/src/MassTransit/Transports/ReceivePipeDispatcher.cs:line 108
   at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock) in /_/src/MassTransit/Transports/ReceivePipeDispatcher.cs:line 115
   at MassTransit.RabbitMqTransport.RabbitMqBasicConsumer.<>c__DisplayClass21_0.<<HandleBasicDeliver>b__0>d.MoveNext() in /_/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/RabbitMqBasicConsumer.cs:line 168
11:04:54.819-E T-FAULT rabbitmq://localhost/TextMessage 2c730000-c333-088f-5e5a-08dcb5f68e97

RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=491, text='Channel is already closed: AMQP close-reason, initiated by Peer, code=406, text='PRECONDITION_FAILED - delivery acknowledgement on channel 2 timed out. Timeout value used: 10000 ms. This timeout value can be configured, see consumers doc guide to learn more', classId=0, methodId=0', classId=0, methodId=0
   at MassTransit.RabbitMqTransport.RabbitMqReceiveLockContext.Complete() in /_/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/RabbitMqReceiveLockContext.cs:line 24
   at MassTransit.Transports.PendingReceiveLockContext.Execute(Func`2 action) in /_/src/MassTransit/Transports/PendingReceiveLockContext.cs:line 67
   at MassTransit.Transports.PendingReceiveLockContext.Execute(Func`2 action) in /_/src/MassTransit/Transports/PendingReceiveLockContext.cs:line 79
   at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock) in /_/src/MassTransit/Transports/ReceivePipeDispatcher.cs:line 71
   at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock) in /_/src/MassTransit/Transports/ReceivePipeDispatcher.cs:line 108
   at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock) in /_/src/MassTransit/Transports/ReceivePipeDispatcher.cs:line 115
   at MassTransit.RabbitMqTransport.RabbitMqBasicConsumer.<>c__DisplayClass21_0.<<HandleBasicDeliver>b__0>d.MoveNext() in /_/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/RabbitMqBasicConsumer.cs:line 168
11:04:54.819-E T-FAULT rabbitmq://localhost/TextMessage 2c730000-c333-088f-6a24-08dcb5f68e97

RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=491, text='Channel is already closed: AMQP close-reason, initiated by Peer, code=406, text='PRECONDITION_FAILED - delivery acknowledgement on channel 2 timed out. Timeout value used: 10000 ms. This timeout value can be configured, see consumers doc guide to learn more', classId=0, methodId=0', classId=0, methodId=0
   at MassTransit.RabbitMqTransport.RabbitMqReceiveLockContext.Complete() in /_/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/RabbitMqReceiveLockContext.cs:line 24
   at MassTransit.Transports.PendingReceiveLockContext.Execute(Func`2 action) in /_/src/MassTransit/Transports/PendingReceiveLockContext.cs:line 67
   at MassTransit.Transports.PendingReceiveLockContext.Execute(Func`2 action) in /_/src/MassTransit/Transports/PendingReceiveLockContext.cs:line 79
   at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock) in /_/src/MassTransit/Transports/ReceivePipeDispatcher.cs:line 71
   at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock) in /_/src/MassTransit/Transports/ReceivePipeDispatcher.cs:line 108
   at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock) in /_/src/MassTransit/Transports/ReceivePipeDispatcher.cs:line 115
   at MassTransit.RabbitMqTransport.RabbitMqBasicConsumer.<>c__DisplayClass21_0.<<HandleBasicDeliver>b__0>d.MoveNext() in /_/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/RabbitMqBasicConsumer.cs:line 168
11:04:54.819-E T-FAULT rabbitmq://localhost/TextMessage 2c730000-c333-088f-9769-08dcb5f68e7b

RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=491, text='Channel is already closed: AMQP close-reason, initiated by Peer, code=406, text='PRECONDITION_FAILED - delivery acknowledgement on channel 2 timed out. Timeout value used: 10000 ms. This timeout value can be configured, see consumers doc guide to learn more', classId=0, methodId=0', classId=0, methodId=0
   at MassTransit.RabbitMqTransport.RabbitMqReceiveLockContext.Complete() in /_/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/RabbitMqReceiveLockContext.cs:line 24
   at MassTransit.Transports.PendingReceiveLockContext.Execute(Func`2 action) in /_/src/MassTransit/Transports/PendingReceiveLockContext.cs:line 67
   at MassTransit.Transports.PendingReceiveLockContext.Execute(Func`2 action) in /_/src/MassTransit/Transports/PendingReceiveLockContext.cs:line 79
   at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock) in /_/src/MassTransit/Transports/ReceivePipeDispatcher.cs:line 71
   at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock) in /_/src/MassTransit/Transports/ReceivePipeDispatcher.cs:line 108
   at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock) in /_/src/MassTransit/Transports/ReceivePipeDispatcher.cs:line 115
   at MassTransit.RabbitMqTransport.RabbitMqBasicConsumer.<>c__DisplayClass21_0.<<HandleBasicDeliver>b__0>d.MoveNext() in /_/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/RabbitMqBasicConsumer.cs:line 168
11:04:54.819-E T-FAULT rabbitmq://localhost/TextMessage 2c730000-c333-088f-1ada-08dcb5f68e6d

RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=491, text='Channel is already closed: AMQP close-reason, initiated by Peer, code=406, text='PRECONDITION_FAILED - delivery acknowledgement on channel 2 timed out. Timeout value used: 10000 ms. This timeout value can be configured, see consumers doc guide to learn more', classId=0, methodId=0', classId=0, methodId=0
   at MassTransit.RabbitMqTransport.RabbitMqReceiveLockContext.Complete() in /_/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/RabbitMqReceiveLockContext.cs:line 24
   at MassTransit.Transports.PendingReceiveLockContext.Execute(Func`2 action) in /_/src/MassTransit/Transports/PendingReceiveLockContext.cs:line 67
   at MassTransit.Transports.PendingReceiveLockContext.Execute(Func`2 action) in /_/src/MassTransit/Transports/PendingReceiveLockContext.cs:line 79
   at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock) in /_/src/MassTransit/Transports/ReceivePipeDispatcher.cs:line 71
   at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock) in /_/src/MassTransit/Transports/ReceivePipeDispatcher.cs:line 108
   at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock) in /_/src/MassTransit/Transports/ReceivePipeDispatcher.cs:line 115
   at MassTransit.RabbitMqTransport.RabbitMqBasicConsumer.<>c__DisplayClass21_0.<<HandleBasicDeliver>b__0>d.MoveNext() in /_/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/RabbitMqBasicConsumer.cs:line 168
11:04:55.834-D Declare queue: name: TextMessage, durable, x-consumer-timeout: 10000, consumer-count: 0 message-count: 6
11:04:55.845-D Declare exchange: name: TextMessage, type: fanout, durable
11:04:55.846-D Bind queue: source: TextMessage, destination: TextMessage
11:04:55.863-D Consumer Ok: rabbitmq://localhost/TextMessage - amq.ctag-c6Ccabqsa_t5luyYZDPKwQ
11:04:55.863-D Endpoint Ready: rabbitmq://localhost/TextMessage
11:05:55.834-D Consumer Model Shutdown: rabbitmq://localhost/TextMessage - amq.ctag-c6Ccabqsa_t5luyYZDPKwQ, Concurrent Peak: 6, 406-PRECONDITION_FAILED - delivery acknowledgement on channel 2 timed out. Timeout value used: 10000 ms. This timeout value can be configured, see consumers doc guide to learn more
11:05:55.834-D Consumer Stopping: rabbitmq://localhost/TextMessage (Consume Loop Exited)
Processing batch of 6 messages
Got message: 8/6/2024 9:06:55 AM High Priority 1 High
Got message: 8/6/2024 9:06:55 AM High Priority 2 High
Got message: 8/6/2024 9:06:55 AM High Priority 3 High
Got message: 8/6/2024 9:06:55 AM High Priority 4 High
Got message: 8/6/2024 9:06:55 AM High Priority 5 High
Got message: 8/6/2024 9:06:55 AM High Priority 6 High
11:06:55.875-D RECEIVE rabbitmq://localhost/TextMessage 2c730000-c333-088f-6a24-08dcb5f68e97 MassTransit.Batch<Messages.TextMessage> MyHighTextMessageConsumer(00:00:00.0005789)
11:06:55.890-D RECEIVE rabbitmq://localhost/TextMessage 2c730000-c333-088f-1ada-08dcb5f68e6d Messages.TextMessage MassTransit.Batching.BatchConsumer<Messages.TextMessage>(00:02:00.0262725)
11:06:55.891-D RECEIVE rabbitmq://localhost/TextMessage 2c730000-c333-088f-5e5a-08dcb5f68e97 Messages.TextMessage MassTransit.Batching.BatchConsumer<Messages.TextMessage>(00:02:00.0272401)
11:06:55.913-D RECEIVE rabbitmq://localhost/TextMessage 2c730000-c333-088f-6792-08dcb5f68e97 Messages.TextMessage MassTransit.Batching.BatchConsumer<Messages.TextMessage>(00:02:00.0484478)
11:06:55.913-D RECEIVE rabbitmq://localhost/TextMessage 2c730000-c333-088f-9769-08dcb5f68e7b Messages.TextMessage MassTransit.Batching.BatchConsumer<Messages.TextMessage>(00:02:00.0484459)
11:06:55.913-D RECEIVE rabbitmq://localhost/TextMessage 2c730000-c333-088f-924c-08dcb5f68e7b Messages.TextMessage MassTransit.Batching.BatchConsumer<Messages.TextMessage>(00:02:00.0484479)
11:06:55.954-D RECEIVE rabbitmq://localhost/TextMessage 2c730000-c333-088f-6a24-08dcb5f68e97 Messages.TextMessage MassTransit.Batching.BatchConsumer<Messages.TextMessage>(00:02:00.0904530)
11:06:56.451-D Endpoint Completed: rabbitmq://localhost/TextMessage
11:06:56.460-D Consumer Completed: rabbitmq://localhost/TextMessage: 6 received, 6 concurrent, amq.ctag-c6Ccabqsa_t5luyYZDPKwQ
11:06:56.461-E T-FAULT rabbitmq://localhost/TextMessage 2c730000-c333-088f-6792-08dcb5f68e97

RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=491, text='Channel is already closed: AMQP close-reason, initiated by Peer, code=406, text='PRECONDITION_FAILED - delivery acknowledgement on channel 2 timed out. Timeout value used: 10000 ms. This timeout value can be configured, see consumers doc guide to learn more', classId=0, methodId=0', classId=0, methodId=0
   at MassTransit.RabbitMqTransport.RabbitMqReceiveLockContext.Complete() in /_/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/RabbitMqReceiveLockContext.cs:line 24
   at MassTransit.Transports.PendingReceiveLockContext.Execute(Func`2 action) in /_/src/MassTransit/Transports/PendingReceiveLockContext.cs:line 67
   at MassTransit.Transports.PendingReceiveLockContext.Execute(Func`2 action) in /_/src/MassTransit/Transports/PendingReceiveLockContext.cs:line 79
   at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock) in /_/src/MassTransit/Transports/ReceivePipeDispatcher.cs:line 71
   at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock) in /_/src/MassTransit/Transports/ReceivePipeDispatcher.cs:line 108
   at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock) in /_/src/MassTransit/Transports/ReceivePipeDispatcher.cs:line 115
   at MassTransit.RabbitMqTransport.RabbitMqBasicConsumer.<>c__DisplayClass21_0.<<HandleBasicDeliver>b__0>d.MoveNext() in /_/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/RabbitMqBasicConsumer.cs:line 168
11:06:56.504-E T-FAULT rabbitmq://localhost/TextMessage 2c730000-c333-088f-924c-08dcb5f68e7b

RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=491, text='Channel is already closed: AMQP close-reason, initiated by Peer, code=406, text='PRECONDITION_FAILED - delivery acknowledgement on channel 2 timed out. Timeout value used: 10000 ms. This timeout value can be configured, see consumers doc guide to learn more', classId=0, methodId=0', classId=0, methodId=0
   at MassTransit.RabbitMqTransport.RabbitMqReceiveLockContext.Complete() in /_/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/RabbitMqReceiveLockContext.cs:line 24
   at MassTransit.Transports.PendingReceiveLockContext.Execute(Func`2 action) in /_/src/MassTransit/Transports/PendingReceiveLockContext.cs:line 67
   at MassTransit.Transports.PendingReceiveLockContext.Execute(Func`2 action) in /_/src/MassTransit/Transports/PendingReceiveLockContext.cs:line 79
   at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock) in /_/src/MassTransit/Transports/ReceivePipeDispatcher.cs:line 71
   at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock) in /_/src/MassTransit/Transports/ReceivePipeDispatcher.cs:line 108
   at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock) in /_/src/MassTransit/Transports/ReceivePipeDispatcher.cs:line 115
   at MassTransit.RabbitMqTransport.RabbitMqBasicConsumer.<>c__DisplayClass21_0.<<HandleBasicDeliver>b__0>d.MoveNext() in /_/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/RabbitMqBasicConsumer.cs:line 168
11:06:56.575-E T-FAULT rabbitmq://localhost/TextMessage 2c730000-c333-088f-6a24-08dcb5f68e97

RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=491, text='Channel is already closed: AMQP close-reason, initiated by Peer, code=406, text='PRECONDITION_FAILED - delivery acknowledgement on channel 2 timed out. Timeout value used: 10000 ms. This timeout value can be configured, see consumers doc guide to learn more', classId=0, methodId=0', classId=0, methodId=0
   at MassTransit.RabbitMqTransport.RabbitMqReceiveLockContext.Complete() in /_/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/RabbitMqReceiveLockContext.cs:line 24
   at MassTransit.Transports.PendingReceiveLockContext.Execute(Func`2 action) in /_/src/MassTransit/Transports/PendingReceiveLockContext.cs:line 67
   at MassTransit.Transports.PendingReceiveLockContext.Execute(Func`2 action) in /_/src/MassTransit/Transports/PendingReceiveLockContext.cs:line 79
   at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock) in /_/src/MassTransit/Transports/ReceivePipeDispatcher.cs:line 71
   at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock) in /_/src/MassTransit/Transports/ReceivePipeDispatcher.cs:line 108
   at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock) in /_/src/MassTransit/Transports/ReceivePipeDispatcher.cs:line 115
   at MassTransit.RabbitMqTransport.RabbitMqBasicConsumer.<>c__DisplayClass21_0.<<HandleBasicDeliver>b__0>d.MoveNext() in /_/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/RabbitMqBasicConsumer.cs:line 168
11:06:56.575-E T-FAULT rabbitmq://localhost/TextMessage 2c730000-c333-088f-1ada-08dcb5f68e6d

RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=491, text='Channel is already closed: AMQP close-reason, initiated by Peer, code=406, text='PRECONDITION_FAILED - delivery acknowledgement on channel 2 timed out. Timeout value used: 10000 ms. This timeout value can be configured, see consumers doc guide to learn more', classId=0, methodId=0', classId=0, methodId=0
   at MassTransit.RabbitMqTransport.RabbitMqReceiveLockContext.Complete() in /_/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/RabbitMqReceiveLockContext.cs:line 24
   at MassTransit.Transports.PendingReceiveLockContext.Execute(Func`2 action) in /_/src/MassTransit/Transports/PendingReceiveLockContext.cs:line 67
   at MassTransit.Transports.PendingReceiveLockContext.Execute(Func`2 action) in /_/src/MassTransit/Transports/PendingReceiveLockContext.cs:line 79
   at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock) in /_/src/MassTransit/Transports/ReceivePipeDispatcher.cs:line 71
   at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock) in /_/src/MassTransit/Transports/ReceivePipeDispatcher.cs:line 108
   at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock) in /_/src/MassTransit/Transports/ReceivePipeDispatcher.cs:line 115
   at MassTransit.RabbitMqTransport.RabbitMqBasicConsumer.<>c__DisplayClass21_0.<<HandleBasicDeliver>b__0>d.MoveNext() in /_/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/RabbitMqBasicConsumer.cs:line 168
11:06:56.575-E T-FAULT rabbitmq://localhost/TextMessage 2c730000-c333-088f-9769-08dcb5f68e7b

RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=491, text='Channel is already closed: AMQP close-reason, initiated by Peer, code=406, text='PRECONDITION_FAILED - delivery acknowledgement on channel 2 timed out. Timeout value used: 10000 ms. This timeout value can be configured, see consumers doc guide to learn more', classId=0, methodId=0', classId=0, methodId=0
   at MassTransit.RabbitMqTransport.RabbitMqReceiveLockContext.Complete() in /_/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/RabbitMqReceiveLockContext.cs:line 24
   at MassTransit.Transports.PendingReceiveLockContext.Execute(Func`2 action) in /_/src/MassTransit/Transports/PendingReceiveLockContext.cs:line 67
   at MassTransit.Transports.PendingReceiveLockContext.Execute(Func`2 action) in /_/src/MassTransit/Transports/PendingReceiveLockContext.cs:line 79
   at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock) in /_/src/MassTransit/Transports/ReceivePipeDispatcher.cs:line 71
   at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock) in /_/src/MassTransit/Transports/ReceivePipeDispatcher.cs:line 108
   at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock) in /_/src/MassTransit/Transports/ReceivePipeDispatcher.cs:line 115
   at MassTransit.RabbitMqTransport.RabbitMqBasicConsumer.<>c__DisplayClass21_0.<<HandleBasicDeliver>b__0>d.MoveNext() in /_/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/RabbitMqBasicConsumer.cs:line 168
11:06:56.593-E T-FAULT rabbitmq://localhost/TextMessage 2c730000-c333-088f-5e5a-08dcb5f68e97

RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=491, text='Channel is already closed: AMQP close-reason, initiated by Peer, code=406, text='PRECONDITION_FAILED - delivery acknowledgement on channel 2 timed out. Timeout value used: 10000 ms. This timeout value can be configured, see consumers doc guide to learn more', classId=0, methodId=0', classId=0, methodId=0
   at MassTransit.RabbitMqTransport.RabbitMqReceiveLockContext.Complete() in /_/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/RabbitMqReceiveLockContext.cs:line 24
   at MassTransit.Transports.PendingReceiveLockContext.Execute(Func`2 action) in /_/src/MassTransit/Transports/PendingReceiveLockContext.cs:line 67
   at MassTransit.Transports.PendingReceiveLockContext.Execute(Func`2 action) in /_/src/MassTransit/Transports/PendingReceiveLockContext.cs:line 79
   at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock) in /_/src/MassTransit/Transports/ReceivePipeDispatcher.cs:line 71
   at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock) in /_/src/MassTransit/Transports/ReceivePipeDispatcher.cs:line 108
   at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock) in /_/src/MassTransit/Transports/ReceivePipeDispatcher.cs:line 115
   at MassTransit.RabbitMqTransport.RabbitMqBasicConsumer.<>c__DisplayClass21_0.<<HandleBasicDeliver>b__0>d.MoveNext() in /_/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/RabbitMqBasicConsumer.cs:line 168
11:06:57.470-D Declare queue: name: TextMessage, durable, x-consumer-timeout: 10000, consumer-count: 0 message-count: 6
11:06:57.480-D Declare exchange: name: TextMessage, type: fanout, durable
11:06:57.481-D Bind queue: source: TextMessage, destination: TextMessage
11:06:57.499-D Consumer Ok: rabbitmq://localhost/TextMessage - amq.ctag-c6Ccabqsa_t5luyYZDPKwQ
11:06:57.499-D Endpoint Ready: rabbitmq://localhost/TextMessage
11:07:57.470-D Consumer Model Shutdown: rabbitmq://localhost/TextMessage - amq.ctag-c6Ccabqsa_t5luyYZDPKwQ, Concurrent Peak: 6, 406-PRECONDITION_FAILED - delivery acknowledgement on channel 2 timed out. Timeout value used: 10000 ms. This timeout value can be configured, see consumers doc guide to learn more
11:07:57.470-D Consumer Stopping: rabbitmq://localhost/TextMessage (Consume Loop Exited)

Link to repository that demonstrates/reproduces the issue

https://github.com/Sirozha1337/rabbitmq-masstransit-batch-timeout-repro

@phatboyg
Copy link
Member

phatboyg commented Aug 6, 2024

First, a few points:

  1. You shouldn't be using the RabbitMQ consumer_timeout this way.
  2. The obvious workaround is to ensure the consumer_timeout higher than the batch TimeLimit + the time it takes for the batch consumer to actually complete the work.

MassTransit is like a sharp pair of scissors, if you aren't careful, you can cut yourself.

That stated, it's an interesting way to simulate these failures (broker closing channel).

@Sirozha1337
Copy link
Author

  1. I know, this is just an example. In my real life scenario this consumer_timeout was buried inside rabbitmq.conf (and was much longer naturally).
  2. Yes, I understand and using this approach now. But when I encountered this issue, it was not obvious from the documentation and the error log. Maybe consumer_timeout should be mentioned here: https://masstransit.io/documentation/concepts/consumers#batch-consumers

Also, maybe there should be other type of BatchConsumer, that doesn't fetch messages from the queue until there are enough messages to fill the batch or if time limit is exceeded.

@phatboyg
Copy link
Member

phatboyg commented Aug 6, 2024

  1. It's 100% a RabbitMQ thing.
  2. RabbitMQ specific details in the general batch consumer documentation isn't likely to happen.

Messages are pushed from the broker, not pulled, so an "other type of batch consumer" isn't even feasible.

@Sirozha1337
Copy link
Author

Messages are pushed from the broker, not pulled, so an "other type of batch consumer" isn't even feasible.

Ah, ok, I see. Though I don't agree about feasibility, at least with RabbitMQ as a transport. Something like this could be done by starting/stopping the consumer with BasicConsume/BasicCancel, but I guess the solution will be messy because of the possible concurrency issues.

phatboyg added a commit that referenced this issue Aug 12, 2024
…ed from the batch. Empty batches should be "ignored."
@phatboyg
Copy link
Member

I kept this issue open to give myself a reminder to add cancellation to batch consumers, now that's done and this will be closed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants