Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PostgreSQL connection issues when using in combination with RabbitMQ transport #1235

Open
zeko77 opened this issue Jun 29, 2023 · 6 comments

Comments

@zeko77
Copy link

zeko77 commented Jun 29, 2023

Describe the bug

Description

I noticed that execution of my saga handlers is constantly immediately retried under normal circumstances (no load at all) when using PostgreSQL persistence.
After detailed examination, I concluded that connection gets unexpectedly disposed or closed.
I have tried to fiddle with the connection settings and pooling, but to no avail.
Switching to MySQL works with no issues, so I suspect Npgsql.

Expected behavior

Saga handlers should execute with no immediate retries under normal circumstances.

Actual behavior

Handlers are almost always retried due to disposed or closed connection.

Versions

  • NServiceBus 8.1.1
  • NServiceBus.Persistence.Sql 7.0.2
  • Npgsql 7.0.4
  • PosrgreSQL 15.2 and 15.3

Steps to reproduce

Here is the solution to reproduce the issue:
https://github.com/zeko77/PostgresSagaIssue

Relevant log output

I have disabled retires in order to log the failure.

fail: NServiceBus.MoveToError[0]
      Moving message 'e6f76034-73c1-4a4f-af5a-b03000ed68a3' to the error queue 'error' because processing failed due to an exception:
      System.ObjectDisposedException: NpgsqlTransaction
         at Npgsql.NpgsqlTransaction.CheckDisposed()
         at Npgsql.NpgsqlTransaction.CheckReady()
         at Npgsql.NpgsqlTransaction.Commit(Boolean async, CancellationToken cancellationToken)
         at StorageSession.CompleteAsync(CancellationToken cancellationToken) in /_/src/SqlPersistence/SynchronizedStorage/StorageSession.cs:line 97
         at NServiceBus.LoadHandlersConnector.Invoke(IIncomingLogicalMessageContext context, Func`2 stage) in /_/src/NServiceBus.Core/Pipeline/Incoming/LoadHandlersConnector.cs:line 49
         at NServiceBus.InvokeSagaNotFoundBehavior.Invoke(IIncomingLogicalMessageContext context, Func`2 next) in /_/src/NServiceBus.Core/Sagas/InvokeSagaNotFoundBehavior.cs:line 17
         at NServiceBus.DeserializeMessageConnector.Invoke(IIncomingPhysicalMessageContext context, Func`2 stage) in /_/src/NServiceBus.Core/Pipeline/Incoming/DeserializeMessageConnector.cs:line 32
         at NServiceBus.InvokeAuditPipelineBehavior.Invoke(IIncomingPhysicalMessageContext context, Func`2 next) in /_/src/NServiceBus.Core/Audit/InvokeAuditPipelineBehavior.cs:line 19
         at NServiceBus.ProcessingStatisticsBehavior.Invoke(IIncomingPhysicalMessageContext context, Func`2 next) in /_/src/NServiceBus.Core/Performance/Statistics/ProcessingStatisticsBehavior.cs:line 25
         at NServiceBus.TransportReceiveToPhysicalMessageConnector.Invoke(ITransportReceiveContext context, Func`2 next) in /_/src/NServiceBus.Core/Pipeline/Incoming/TransportReceiveToPhysicalMessageConnector.cs:line 35
         at NServiceBus.RetryAcknowledgementBehavior.Invoke(ITransportReceiveContext context, Func`2 next) in /_/src/NServiceBus.Core/ServicePlatform/Retries/RetryAcknowledgementBehavior.cs:line 25
         at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken) in /_/src/NServiceBus.Core/Pipeline/MainPipelineExecutor.cs:line 49
         at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken) in /_/src/NServiceBus.Core/Pipeline/MainPipelineExecutor.cs:line 68
         at NServiceBus.Transport.RabbitMQ.MessagePump.Process(AsyncEventingBasicConsumer consumer, BasicDeliverEventArgs message, CancellationToken messageProcessingCancellationToken) in /_/src/NServiceBus.Transport.RabbitMQ/Receiving/MessagePump.cs:line 402

Additional Information

No response

@zeko77 zeko77 added the Bug label Jun 29, 2023
@ramonsmits
Copy link
Member

@zeko77 Our last release was tested ok: https://github.com/Particular/NServiceBus.Persistence.Sql/actions/runs/5312223106

It was also using Npgsql 7.0.4 but based on the CI workflow tested against postgres:14.

I'll run your repro Tomorrow to see that happens on my workstation.

@ramonsmits
Copy link
Member

@zeko77 I tested your repro and https://github.com/Particular/docs.particular.net/blob/master/samples/sql-persistence/simple/SqlPersistence_7/

Our sample works fine with Postgres 14 and 15 but yours is using RabbitMQ transport and our sample the learning transport.

After converting your repro to learning transport it works.

Its seems we have an incompatibility between RabbitMQ and the Postgres dialect.

I got the following various different errors. It seems as if the lifetime for various objects are incorrectly managed. Its as if registrations of certain types either are not registered as transient or as if the lifetime scope isn't correctly defined resulting in re-use of

System.InvalidOperationException: The reader is closed

      System.InvalidOperationException: The reader is closed
         at Npgsql.NpgsqlDataReader.GetFieldValueSequential[T](Int32 column, Boolean async, CancellationToken cancellationToken)
         at SagaPersister.GetSagaData[TSagaData](ISynchronizedStorageSession session, String commandText, RuntimeSagaInfo sagaInfo, ParameterAppender appendParameters, CancellationToken cancellationToken) in /_/src/SqlPersistence/Saga/SagaPersister_Get.cs:line 92
         at SagaPersister.Get[TSagaData](String propertyName, Object propertyValue, ISynchronizedStorageSession session, ContextBag context, CancellationToken cancellationToken) in /_/src/SqlPersistence/Saga/SagaPersister_Get.cs:line 31
         at NServiceBus.PropertySagaFinder`1.Find(IServiceProvider builder, SagaFinderDefinition finderDefinition, ISynchronizedStorageSession storageSession, ContextBag context, Object message, IReadOnlyDictionary`2 messageHeaders, CancellationToken cancellationToken) in /_/src/NServiceBus.Core/Sagas/PropertySagaFinder.cs:line 42
         at NServiceBus.SagaPersistenceBehavior.Invoke(IInvokeHandlerContext context, Func`2 next) in /_/src/NServiceBus.Core/Sagas/SagaPersistenceBehavior.cs:line 77
         at NServiceBus.LoadHandlersConnector.Invoke(IIncomingLogicalMessageContext context, Func`2 stage) in /_/src/NServiceBus.Core/Pipeline/Incoming/LoadHandlersConnector.cs:line 40
         at NServiceBus.InvokeSagaNotFoundBehavior.Invoke(IIncomingLogicalMessageContext context, Func`2 next) in /_/src/NServiceBus.Core/Sagas/InvokeSagaNotFoundBehavior.cs:line 17
         at NServiceBus.DeserializeMessageConnector.Invoke(IIncomingPhysicalMessageContext context, Func`2 stage) in /_/src/NServiceBus.Core/Pipeline/Incoming/DeserializeMessageConnector.cs:line 32
         at NServiceBus.InvokeAuditPipelineBehavior.Invoke(IIncomingPhysicalMessageContext context, Func`2 next) in /_/src/NServiceBus.Core/Audit/InvokeAuditPipelineBehavior.cs:line 19
         at NServiceBus.ProcessingStatisticsBehavior.Invoke(IIncomingPhysicalMessageContext context, Func`2 next) in /_/src/NServiceBus.Core/Performance/Statistics/ProcessingStatisticsBehavior.cs:line 25
         at NServiceBus.TransportReceiveToPhysicalMessageConnector.Invoke(ITransportReceiveContext context, Func`2 next) in /_/src/NServiceBus.Core/Pipeline/Incoming/TransportReceiveToPhysicalMessageConnector.cs:line 35
         at NServiceBus.RetryAcknowledgementBehavior.Invoke(ITransportReceiveContext context, Func`2 next) in /_/src/NServiceBus.Core/ServicePlatform/Retries/RetryAcknowledgementBehavior.cs:line 25
         at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken) in /_/src/NServiceBus.Core/Pipeline/MainPipelineExecutor.cs:line 49
         at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken) in /_/src/NServiceBus.Core/Pipeline/MainPipelineExecutor.cs:line 68
         at NServiceBus.Transport.RabbitMQ.MessagePump.Process(AsyncEventingBasicConsumer consumer, BasicDeliverEventArgs message, CancellationToken messageProcessingCancellationToken) in /_/src/NServiceBus.Transport.RabbitMQ/Receiving/MessagePump.cs:line 402

Npgsql.NpgsqlOperationInProgressException (0x80004005): The connection is already in state 'Executing'

      Npgsql.NpgsqlOperationInProgressException (0x80004005): The connection is already in state 'Executing'
         at Npgsql.Internal.NpgsqlConnector.<StartUserAction>g__DoStartUserAction|279_0(ConnectorState newState, NpgsqlCommand command, <>c__DisplayClass279_0&)
         at Npgsql.Internal.NpgsqlConnector.StartUserAction(ConnectorState newState, NpgsqlCommand command, CancellationToken cancellationToken, Boolean attemptPgCancellation)
         at Npgsql.Internal.NpgsqlConnector.StartUserAction(CancellationToken cancellationToken, Boolean attemptPgCancellation)
         at Npgsql.NpgsqlTransaction.Commit(Boolean async, CancellationToken cancellationToken)
         at StorageSession.CompleteAsync(CancellationToken cancellationToken) in /_/src/SqlPersistence/SynchronizedStorage/StorageSession.cs:line 97
         at NServiceBus.LoadHandlersConnector.Invoke(IIncomingLogicalMessageContext context, Func`2 stage) in /_/src/NServiceBus.Core/Pipeline/Incoming/LoadHandlersConnector.cs:line 49
         at NServiceBus.InvokeSagaNotFoundBehavior.Invoke(IIncomingLogicalMessageContext context, Func`2 next) in /_/src/NServiceBus.Core/Sagas/InvokeSagaNotFoundBehavior.cs:line 17
         at NServiceBus.DeserializeMessageConnector.Invoke(IIncomingPhysicalMessageContext context, Func`2 stage) in /_/src/NServiceBus.Core/Pipeline/Incoming/DeserializeMessageConnector.cs:line 32
         at NServiceBus.InvokeAuditPipelineBehavior.Invoke(IIncomingPhysicalMessageContext context, Func`2 next) in /_/src/NServiceBus.Core/Audit/InvokeAuditPipelineBehavior.cs:line 19
         at NServiceBus.ProcessingStatisticsBehavior.Invoke(IIncomingPhysicalMessageContext context, Func`2 next) in /_/src/NServiceBus.Core/Performance/Statistics/ProcessingStatisticsBehavior.cs:line 25
         at NServiceBus.TransportReceiveToPhysicalMessageConnector.Invoke(ITransportReceiveContext context, Func`2 next) in /_/src/NServiceBus.Core/Pipeline/Incoming/TransportReceiveToPhysicalMessageConnector.cs:line 35
         at NServiceBus.RetryAcknowledgementBehavior.Invoke(ITransportReceiveContext context, Func`2 next) in /_/src/NServiceBus.Core/ServicePlatform/Retries/RetryAcknowledgementBehavior.cs:line 25
         at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken) in /_/src/NServiceBus.Core/Pipeline/MainPipelineExecutor.cs:line 49
         at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken) in /_/src/NServiceBus.Core/Pipeline/MainPipelineExecutor.cs:line 68
         at NServiceBus.Transport.RabbitMQ.MessagePump.Process(AsyncEventingBasicConsumer consumer, BasicDeliverEventArgs message, CancellationToken messageProcessingCancellationToken) in /_/src/NServiceBus.Transport.RabbitMQ/Receiving/MessagePump.cs:line 402

Npgsql.NpgsqlOperationInProgressException (0x80004005): A command is already in progress:

      Npgsql.NpgsqlOperationInProgressException (0x80004005): A command is already in progress:
         at Npgsql.Internal.NpgsqlConnector.<StartUserAction>g__DoStartUserAction|279_0(ConnectorState newState, NpgsqlCommand command, <>c__DisplayClass279_0&)
         at Npgsql.Internal.NpgsqlConnector.StartUserAction(ConnectorState newState, NpgsqlCommand command, CancellationToken cancellationToken, Boolean attemptPgCancellation)
         at Npgsql.Internal.NpgsqlConnector.StartUserAction(CancellationToken cancellationToken, Boolean attemptPgCancellation)
         at Npgsql.Internal.NpgsqlConnector.Reset(Boolean async)
         at Npgsql.NpgsqlConnection.CloseAsync(Boolean async)
         at Npgsql.NpgsqlConnection.Close()
         at Npgsql.NpgsqlConnection.Dispose(Boolean disposing)
         at System.ComponentModel.Component.Dispose()
         at StorageSession.Dispose() in /_/src/SqlPersistence/SynchronizedStorage/StorageSession.cs:line 114
         at Microsoft.Extensions.DependencyInjection.ServiceLookup.ServiceProviderEngineScope.DisposeAsync()
      --- End of stack trace from previous location ---
         at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken) in /_/src/NServiceBus.Core/Pipeline/MainPipelineExecutor.cs:line 68
         at NServiceBus.Transport.RabbitMQ.MessagePump.Process(AsyncEventingBasicConsumer consumer, BasicDeliverEventArgs message, CancellationToken messageProcessingCancellationToken) in /_/src/NServiceBus.Transport.RabbitMQ/Receiving/MessagePump.cs:line 402

System.ObjectDisposedException: NpgsqlTransaction

      System.ObjectDisposedException: NpgsqlTransaction
         at Npgsql.NpgsqlTransaction.CheckDisposed()
         at Npgsql.NpgsqlTransaction.CheckReady()
         at Npgsql.NpgsqlTransaction.Commit(Boolean async, CancellationToken cancellationToken)
         at StorageSession.CompleteAsync(CancellationToken cancellationToken) in /_/src/SqlPersistence/SynchronizedStorage/StorageSession.cs:line 97
         at NServiceBus.LoadHandlersConnector.Invoke(IIncomingLogicalMessageContext context, Func`2 stage) in /_/src/NServiceBus.Core/Pipeline/Incoming/LoadHandlersConnector.cs:line 49
         at NServiceBus.InvokeSagaNotFoundBehavior.Invoke(IIncomingLogicalMessageContext context, Func`2 next) in /_/src/NServiceBus.Core/Sagas/InvokeSagaNotFoundBehavior.cs:line 17
         at NServiceBus.DeserializeMessageConnector.Invoke(IIncomingPhysicalMessageContext context, Func`2 stage) in /_/src/NServiceBus.Core/Pipeline/Incoming/DeserializeMessageConnector.cs:line 32
         at NServiceBus.InvokeAuditPipelineBehavior.Invoke(IIncomingPhysicalMessageContext context, Func`2 next) in /_/src/NServiceBus.Core/Audit/InvokeAuditPipelineBehavior.cs:line 19
         at NServiceBus.ProcessingStatisticsBehavior.Invoke(IIncomingPhysicalMessageContext context, Func`2 next) in /_/src/NServiceBus.Core/Performance/Statistics/ProcessingStatisticsBehavior.cs:line 25
         at NServiceBus.TransportReceiveToPhysicalMessageConnector.Invoke(ITransportReceiveContext context, Func`2 next) in /_/src/NServiceBus.Core/Pipeline/Incoming/TransportReceiveToPhysicalMessageConnector.cs:line 35
         at NServiceBus.RetryAcknowledgementBehavior.Invoke(ITransportReceiveContext context, Func`2 next) in /_/src/NServiceBus.Core/ServicePlatform/Retries/RetryAcknowledgementBehavior.cs:line 25
         at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken) in /_/src/NServiceBus.Core/Pipeline/MainPipelineExecutor.cs:line 49
         at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken) in /_/src/NServiceBus.Core/Pipeline/MainPipelineExecutor.cs:line 68
         at NServiceBus.Transport.RabbitMQ.MessagePump.Process(AsyncEventingBasicConsumer consumer, BasicDeliverEventArgs message, CancellationToken messageProcessingCancellationToken) in /_/src/NServiceBus.Transport.RabbitMQ/Receiving/MessagePump.cs:line 402

Npgsql.NpgsqlException (0x80004005): Exception while reading from stream

      Npgsql.NpgsqlException (0x80004005): Exception while reading from stream
       ---> System.IO.IOException: Unable to read data from the transport connection: An established connection was aborted by the software in your host machine..
       ---> System.Net.Sockets.SocketException (10053): An established connection was aborted by the software in your host machine.
         at System.Net.Sockets.NetworkStream.Read(Byte[] buffer, Int32 offset, Int32 count)
         --- End of inner exception stack trace ---
         at System.Net.Sockets.NetworkStream.Read(Byte[] buffer, Int32 offset, Int32 count)
         at Npgsql.Internal.NpgsqlReadBuffer.<Ensure>g__EnsureLong|42_0(NpgsqlReadBuffer buffer, Int32 count, Boolean async, Boolean readingNotifications)
         at Npgsql.Internal.NpgsqlReadBuffer.<Ensure>g__EnsureLong|42_0(NpgsqlReadBuffer buffer, Int32 count, Boolean async, Boolean readingNotifications)
         at Npgsql.Internal.NpgsqlReadBuffer.Skip(Int64 len, Boolean async)
         at Npgsql.Internal.NpgsqlReadBuffer.ColumnStream.DisposeAsync(Boolean disposing, Boolean async)
         at Npgsql.Internal.NpgsqlReadBuffer.ColumnStream.Dispose(Boolean disposing)
         at System.IO.Stream.Close()
         at Npgsql.PreparedTextReader.Dispose(Boolean disposing)
         at System.IO.TextReader.Close()
         at Newtonsoft.Json.JsonTextReader.Close()
         at Newtonsoft.Json.JsonReader.Dispose(Boolean disposing)
         at Newtonsoft.Json.JsonReader.System.IDisposable.Dispose()
         at Serializer.Deserialize[T](TextReader reader) in /_/src/SqlPersistence/Serializer.cs:line 36
         at SagaPersister.ReadMetadata(DbDataReader dataReader, String& originator, String& originalMessageId) in /_/src/SqlPersistence/Saga/SagaPersister_Get.cs:line 112
         at SagaPersister.GetSagaData[TSagaData](ISynchronizedStorageSession session, String commandText, RuntimeSagaInfo sagaInfo, ParameterAppender appendParameters, CancellationToken cancellationToken) in /_/src/SqlPersistence/Saga/SagaPersister_Get.cs:line 95
         at SagaPersister.Get[TSagaData](String propertyName, Object propertyValue, ISynchronizedStorageSession session, ContextBag context, CancellationToken cancellationToken) in /_/src/SqlPersistence/Saga/SagaPersister_Get.cs:line 31
         at NServiceBus.PropertySagaFinder`1.Find(IServiceProvider builder, SagaFinderDefinition finderDefinition, ISynchronizedStorageSession storageSession, ContextBag context, Object message, IReadOnlyDictionary`2 messageHeaders, CancellationToken cancellationToken) in /_/src/NServiceBus.Core/Sagas/PropertySagaFinder.cs:line 42
         at NServiceBus.SagaPersistenceBehavior.Invoke(IInvokeHandlerContext context, Func`2 next) in /_/src/NServiceBus.Core/Sagas/SagaPersistenceBehavior.cs:line 154
         at NServiceBus.LoadHandlersConnector.Invoke(IIncomingLogicalMessageContext context, Func`2 stage) in /_/src/NServiceBus.Core/Pipeline/Incoming/LoadHandlersConnector.cs:line 50
         at NServiceBus.InvokeSagaNotFoundBehavior.Invoke(IIncomingLogicalMessageContext context, Func`2 next) in /_/src/NServiceBus.Core/Sagas/InvokeSagaNotFoundBehavior.cs:line 17
         at NServiceBus.DeserializeMessageConnector.Invoke(IIncomingPhysicalMessageContext context, Func`2 stage) in /_/src/NServiceBus.Core/Pipeline/Incoming/DeserializeMessageConnector.cs:line 32
         at NServiceBus.InvokeAuditPipelineBehavior.Invoke(IIncomingPhysicalMessageContext context, Func`2 next) in /_/src/NServiceBus.Core/Audit/InvokeAuditPipelineBehavior.cs:line 19
         at NServiceBus.ProcessingStatisticsBehavior.Invoke(IIncomingPhysicalMessageContext context, Func`2 next) in /_/src/NServiceBus.Core/Performance/Statistics/ProcessingStatisticsBehavior.cs:line 25
         at NServiceBus.TransportReceiveToPhysicalMessageConnector.Invoke(ITransportReceiveContext context, Func`2 next) in /_/src/NServiceBus.Core/Pipeline/Incoming/TransportReceiveToPhysicalMessageConnector.cs:line 68
         at NServiceBus.RetryAcknowledgementBehavior.Invoke(ITransportReceiveContext context, Func`2 next) in /_/src/NServiceBus.Core/ServicePlatform/Retries/RetryAcknowledgementBehavior.cs:line 25
         at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken) in /_/src/NServiceBus.Core/Pipeline/MainPipelineExecutor.cs:line 49
         at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken) in /_/src/NServiceBus.Core/Pipeline/MainPipelineExecutor.cs:line 68
         at NServiceBus.Transport.RabbitMQ.MessagePump.Process(AsyncEventingBasicConsumer consumer, BasicDeliverEventArgs message, CancellationToken messageProcessingCancellationToken) in /_/src/NServiceBus.Transport.RabbitMQ/Receiving/MessagePump.cs:line 446

@ramonsmits
Copy link
Member

@zeko77 Can you share how you are currently affected and also if you are a licensed customer? If you don't want to disclose this publicly then please provide these details via support@particular.net and please mention this issue url and my name.

@ramonsmits ramonsmits changed the title PostgreSQL connection breakdown in saga handlers causes immediate retries PostgreSQL connection issues when using in combination with RabbitMQ transport Jun 30, 2023
@zeko77
Copy link
Author

zeko77 commented Jun 30, 2023

Thanks for prompt response.
I too am getting all of the exceptions that you mentioned.
I am a licensed customer, but this issue occurs on a new project that is in still development phase. I accidentally took a peek at logs and discovered this. Since this is resolved on a first retry, it went under radar for quite some time.

@danielmarbach
Copy link
Contributor

@zeko77 We've prioritized your issue near the top of our list and will work on it as soon as we can.

@rhysparry
Copy link

I encountered this issue while working on a simple project with a RabbitMQ transport and PostgreSQL persistence. In my case the exceptions stopped happening when I enabled the Outbox.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants