Skip to content

Commit

Permalink
consumers shouldn't be able to receive any messages until connection …
Browse files Browse the repository at this point in the history
…is started
  • Loading branch information
HavretGC committed Jun 17, 2019
1 parent 1cda254 commit 2be0f8b
Show file tree
Hide file tree
Showing 8 changed files with 126 additions and 12 deletions.
13 changes: 13 additions & 0 deletions src/NMS.AMQP/NmsConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public class NmsConnection : IConnection, IProviderListener
private IdGenerator sessionIdGenerator;
private readonly AtomicBool connected = new AtomicBool();
private readonly AtomicBool closed = new AtomicBool();
private readonly AtomicBool started = new AtomicBool();

public NmsConnection(ConnectionInfo connectionInfo, IProvider provider)
{
Expand Down Expand Up @@ -90,6 +91,11 @@ public ISession CreateSession(AcknowledgementMode acknowledgementMode)
{
session.Begin().ConfigureAwait(false).GetAwaiter().GetResult();
sessions.TryAdd(session.SessionInfo.Id, session);
if (started)
{
session.Start();
}

return session;
}
catch (NMSException)
Expand Down Expand Up @@ -166,6 +172,13 @@ private IdGenerator SessionIdGenerator

public void Start()
{
if (started.CompareAndSet(false, true))
{
foreach (var session in sessions.Values.ToArray())
{
session.Start();
}
}
}

public bool IsStarted { get; set; }
Expand Down
54 changes: 53 additions & 1 deletion src/NMS.AMQP/NmsMessageConsumer.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Apache.NMS.AMQP.Message;
using Apache.NMS.AMQP.Util;
Expand All @@ -8,11 +9,12 @@ namespace Apache.NMS.AMQP
public class NmsMessageConsumer : IMessageConsumer
{
private readonly AtomicBool closed = new AtomicBool();
private readonly AtomicBool started = new AtomicBool();
private readonly AcknowledgementMode acknowledgementMode;
public NmsSession Session { get; }
public ConsumerInfo Info { get; }
public IDestination Destination => Info.Destination;


public NmsMessageConsumer(Id consumerId, NmsSession session, IDestination destination, string selector, bool noLocal) : this(consumerId, session, destination, null, selector, noLocal)
{
Expand Down Expand Up @@ -123,6 +125,50 @@ public IMessage ReceiveNoWait()
}

public IMessage Receive(TimeSpan timeout)
{
if (started)
{
return ReceiveInternal(timeout);
}

DateTime deadline = GetDeadline(timeout);

while (true)
{
timeout = deadline - DateTime.UtcNow;
if (timeout < TimeSpan.Zero)
{
return null;
}

if (started)
{
return ReceiveInternal(timeout);
}
}
}

private static DateTime GetDeadline(TimeSpan timeout)
{
if (timeout == TimeSpan.MaxValue)
{
return DateTime.MaxValue;
}

DateTime deadline;
try
{
deadline = DateTime.UtcNow + timeout;
}
catch (OverflowException)
{
deadline = DateTime.MaxValue;
}

return deadline;
}

private IMessage ReceiveInternal(TimeSpan timeout)
{
CheckClosed();
CheckMessageListener();
Expand Down Expand Up @@ -180,7 +226,13 @@ public void Shutdown()
if (closed.CompareAndSet(false, true))
{
Session.Remove(this);
started.Set(false);
}
}

public void Start()
{
started.Set(true);
}
}
}
22 changes: 21 additions & 1 deletion src/NMS.AMQP/NmsSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ public class NmsSession : ISession

private readonly NestedIdGenerator consumerIdGenerator;
private readonly NestedIdGenerator producerIdGenerator;
private AtomicBool closed = new AtomicBool(false);
private readonly AtomicBool closed = new AtomicBool();
private readonly AtomicBool started = new AtomicBool();

public SessionInfo SessionInfo { get; }
public NmsConnection Connection { get; }
Expand Down Expand Up @@ -79,6 +80,10 @@ public IMessageConsumer CreateConsumer(IDestination destination, string selector
NmsMessageConsumer messageConsumer = new NmsMessageConsumer(consumerIdGenerator.GenerateId(), this, destination, selector, noLocal);
messageConsumer.Init().ConfigureAwait(false).GetAwaiter().GetResult();
consumers.TryAdd(messageConsumer.Info.Id, messageConsumer);
if (started)
{
messageConsumer.Start();
}
return messageConsumer;
}

Expand All @@ -90,6 +95,10 @@ public IMessageConsumer CreateDurableConsumer(ITopic destination, string name, s
messageConsumer.Info.IsDurable = true;
messageConsumer.Init().ConfigureAwait(false).GetAwaiter().GetResult();
consumers.TryAdd(messageConsumer.Info.Id, messageConsumer);
if (started)
{
messageConsumer.Start();
}
return messageConsumer;
}

Expand Down Expand Up @@ -353,5 +362,16 @@ public void Shutdown(NMSException exception)
}
}
}

public void Start()
{
if (started.CompareAndSet(false, true))
{
foreach (NmsMessageConsumer consumer in consumers.Values.ToArray())
{
consumer.Start();
}
}
}
}
}
1 change: 1 addition & 0 deletions src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public Task Attach()
{
taskCompletionSource.SetResult(true);
});

link.AddClosedCallback(OnClosed);
return taskCompletionSource.Task;
}
Expand Down
2 changes: 1 addition & 1 deletion test/Integration/AmqpAcknowledgmentsIntegrationTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public void TestAcknowledgeIndividualMessagesAsync()
latch.countDown();
};

latch.await(TimeSpan.FromMilliseconds(100));
Assert.True(latch.await(TimeSpan.FromMilliseconds(1000)));

// Acknowledge the messages in a random order and verify the individual dispositions have expected delivery state.
Random random = new Random();
Expand Down
37 changes: 32 additions & 5 deletions test/Integration/ConsumerIntegrationTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
using Apache.NMS;
using Apache.NMS.AMQP;
using Apache.NMS.AMQP.Provider.Amqp.Message;
using Apache.NMS.Util;
using Moq;
using NMS.AMQP.Test.TestAmqp;
using NUnit.Framework;
Expand Down Expand Up @@ -51,6 +50,7 @@ public void TestRemotelyCloseConsumer()
{
testAmqpPeer.Open();
NmsConnection connection = (NmsConnection)EstablishConnection();
connection.Start();
connection.AddConnectionListener(mockConnectionListener.Object);
connection.ExceptionListener += exception => { exceptionFired = true; };

Expand Down Expand Up @@ -82,6 +82,7 @@ public void TestRemotelyCloseConsumerWithMessageListenerFiresExceptionListener()
{
testAmqpPeer.Open();
NmsConnection connection = (NmsConnection)EstablishConnection();
connection.Start();
connection.AddConnectionListener(mockConnectionListener.Object);
connection.ExceptionListener += exception => { exceptionFired = true; };

Expand All @@ -101,6 +102,8 @@ public void TestRemotelyCloseConsumerWithMessageListenerFiresExceptionListener()
// Try closing it explicitly, should effectively no-op in client.
// The test peer will throw during close if it sends anything.
consumer.Close();
session.Close();
connection.Close();
}
}

Expand All @@ -113,6 +116,7 @@ public void TestReceiveMessageWithReceiveZeroTimeout()
testAmqpPeer.SendMessage("myQueue", "test");

NmsConnection connection = (NmsConnection)EstablishConnection();
connection.Start();
ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
IQueue queue = session.GetQueue("myQueue");
IMessageConsumer consumer = session.CreateConsumer(queue);
Expand All @@ -133,6 +137,7 @@ public void TestExceptionInOnMessageReleasesInAutoAckMode()
testAmqpPeer.SendMessage("myQueue", "test");

NmsConnection connection = (NmsConnection)EstablishConnection();
connection.Start();
ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
IQueue queue = session.GetQueue("myQueue");
IMessageConsumer consumer = session.CreateConsumer(queue);
Expand Down Expand Up @@ -241,6 +246,7 @@ private void DoTestConsumerReceiveThrowsIfConnectionLost(bool useTimeout)
testAmqpPeer.Open();

IConnection connection = EstablishConnection();
connection.Start();
ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
ITopic topic = session.GetTopic(topicName);
testAmqpPeer.SendMessage(topicName, "test");
Expand Down Expand Up @@ -278,6 +284,7 @@ public void TestConsumerReceiveNoWaitThrowsIfConnectionLost()
testAmqpPeer.Open();

IConnection connection = EstablishConnection();
connection.Start();
ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
ITopic topic = session.GetTopic(topicName);
testAmqpPeer.SendMessage(topicName, "test");
Expand Down Expand Up @@ -310,11 +317,12 @@ public void TestSetMessageListenerAfterStartAndSend()
}

NmsConnection connection = (NmsConnection)EstablishConnection();
connection.Start();
ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
IQueue queue = session.GetQueue("myQueue");
IMessageConsumer consumer = session.CreateConsumer(queue);

consumer.Listener += message => {};
consumer.Listener += message => { };

Assert.That(() => testAmqpPeer.AcceptedMessages.Count(), Is.EqualTo(messageCount).After(2000, 100));

Expand All @@ -324,12 +332,31 @@ public void TestSetMessageListenerAfterStartAndSend()
}
}

[Test]
public void TestNoReceivedMessagesWhenConnectionNotStarted()
{
using (TestAmqpPeer testAmqpPeer = new TestAmqpPeer(Address, User, Password))
{
testAmqpPeer.Open();
testAmqpPeer.SendMessage("myQueue", "test");

NmsConnection connection = (NmsConnection)EstablishConnection();
ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
IQueue queue = session.GetQueue("myQueue");
IMessageConsumer consumer = session.CreateConsumer(queue);

Assert.Null(consumer.Receive(TimeSpan.FromMilliseconds(100)));

consumer.Close();
session.Close();
connection.Close();
}
}

private IConnection EstablishConnection()
{
NmsConnectionFactory factory = new NmsConnectionFactory(Address);
IConnection connection = factory.CreateConnection(User, Password);
connection.Start();
return connection;
return factory.CreateConnection(User, Password);
}
}
}
1 change: 1 addition & 0 deletions test/Integration/SessionIntegrationTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public void TestCreateDurableConsumer()
testAmqpPeer.Open();

NmsConnection connection = (NmsConnection)EstablishConnection();
connection.Start();
ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);

string topicName = "myTopic";
Expand Down
8 changes: 4 additions & 4 deletions test/TestAmqp/TestMessageSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ public TestMessageSource()
this.acceptedMessages = new List<Amqp.Message>();
}

public IEnumerable<Amqp.Message> RejectedMessage => rejectedMessages;
public IEnumerable<Amqp.Message> ReleasedMessages => releasedMessages;

public IEnumerable<Amqp.Message> AcceptedMessages => acceptedMessages;

public Task<ReceiveContext> GetMessageAsync(ListenerLink link)
Expand Down Expand Up @@ -55,13 +55,13 @@ public void DisposeMessage(ReceiveContext receiveContext, DispositionContext dis
switch (dispositionContext.DeliveryState)
{
case Accepted _:
acceptedMessages.Add(receiveContext.Message);
this.acceptedMessages.Add(receiveContext.Message);
break;
case Rejected _:
rejectedMessages.Add(receiveContext.Message);
this.rejectedMessages.Add(receiveContext.Message);
break;
case Released _:
releasedMessages.Add(receiveContext.Message);
this.releasedMessages.Add(receiveContext.Message);
break;
}

Expand Down

0 comments on commit 2be0f8b

Please sign in to comment.