Skip to content

Commit

Permalink
Connection start / stop scenario
Browse files Browse the repository at this point in the history
  • Loading branch information
Havret committed Jul 4, 2019
1 parent 9f4209e commit 1aef201
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 57 deletions.
7 changes: 7 additions & 0 deletions src/NMS.AMQP/NmsConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,14 @@ public void Dispose()

public void Stop()
{
CheckClosed();
CheckIsOnDeliveryThread();

if (started.CompareAndSet(true, false))
{
foreach (NmsSession session in sessions.Values)
session.Stop();
}
}

public ISession CreateSession()
Expand Down
12 changes: 10 additions & 2 deletions src/NMS.AMQP/NmsMessageConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ public NmsMessageConsumer(Id consumerId, NmsSession session, IDestination destin
SubscriptionName = name
};
deliveryTask = new MessageDeliveryTask(this);

if (Session.IsStarted)
Start();
}

public void Dispose()
Expand Down Expand Up @@ -95,6 +98,7 @@ event MessageListener IMessageConsumer.Listener
public async Task Init()
{
await Session.Connection.CreateResource(Info);
await Session.Connection.StartResource(Info);
}

public void OnInboundMessage(InboundMessageDispatch envelope)
Expand All @@ -111,7 +115,7 @@ public void OnInboundMessage(InboundMessageDispatch envelope)

private void DeliverNextPending()
{
if (Session.IsStarted && Listener != null && messageQueue.TryTake(out InboundMessageDispatch envelope))
if (Session.IsStarted && started && Listener != null && messageQueue.TryTake(out InboundMessageDispatch envelope))
{
if (IsMessageExpired(envelope))
{
Expand Down Expand Up @@ -357,7 +361,6 @@ public void Start()
{
if (started.CompareAndSet(false, true))
{
Session.Connection.StartResource(Info).ConfigureAwait(false).GetAwaiter().GetResult();
DrainMessageQueueToListener();
}
}
Expand Down Expand Up @@ -399,5 +402,10 @@ public void DeliverNextPending()
consumer.DeliverNextPending();
}
}

public void Stop()
{
started.Set(false);
}
}
}
65 changes: 19 additions & 46 deletions src/NMS.AMQP/NmsSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,11 @@ public class NmsSession : ISession
private readonly NestedIdGenerator producerIdGenerator;
private readonly AtomicBool closed = new AtomicBool();
private readonly AtomicBool started = new AtomicBool();
private readonly AtomicBool onDispatchThread = new AtomicBool();

public SessionInfo SessionInfo { get; }
public NmsConnection Connection { get; }

private ActionBlock<NmsMessageConsumer.MessageDeliveryTask> dispatcher;
private int dispatchThreadId = -1;

private SessionDispatcher dispatcher;

public NmsSession(NmsConnection connection, Id sessionId, AcknowledgementMode acknowledgementMode)
{
Expand All @@ -38,28 +35,6 @@ public NmsSession(NmsConnection connection, Id sessionId, AcknowledgementMode ac
};
consumerIdGenerator = new NestedIdGenerator("ID:consumer", SessionInfo.Id, true);
producerIdGenerator = new NestedIdGenerator("ID:producer", SessionInfo.Id, true);

dispatcher = CreateDispatcher();
}

private ActionBlock<NmsMessageConsumer.MessageDeliveryTask> CreateDispatcher()
{
return new ActionBlock<NmsMessageConsumer.MessageDeliveryTask>(messageDeliveryTask =>
{
try
{
dispatchThreadId = Thread.CurrentThread.ManagedThreadId;
messageDeliveryTask.DeliverNextPending();
}
finally
{
dispatchThreadId = -1;
}
}, new ExecutionDataflowBlockOptions
{
EnsureOrdered = true,
MaxDegreeOfParallelism = 1
});
}

public bool IsStarted => started;
Expand Down Expand Up @@ -121,11 +96,6 @@ public IMessageConsumer CreateConsumer(IDestination destination, string selector
NmsMessageConsumer messageConsumer = new NmsMessageConsumer(consumerIdGenerator.GenerateId(), this, destination, selector, noLocal);
messageConsumer.Init().ConfigureAwait(false).GetAwaiter().GetResult();
consumers.TryAdd(messageConsumer.Info.Id, messageConsumer);
if (started)
{
messageConsumer.Start();
}

return messageConsumer;
}

Expand All @@ -137,10 +107,6 @@ public IMessageConsumer CreateDurableConsumer(ITopic destination, string name, s
messageConsumer.Info.IsDurable = true;
messageConsumer.Init().ConfigureAwait(false).GetAwaiter().GetResult();
consumers.TryAdd(messageConsumer.Info.Id, messageConsumer);
if (started)
{
messageConsumer.Start();
}

return messageConsumer;
}
Expand Down Expand Up @@ -290,15 +256,6 @@ public void OnInboundMessage(InboundMessageDispatch envelope)
if (consumers.TryGetValue(envelope.ConsumerInfo.Id, out NmsMessageConsumer messageConsumer))
{
messageConsumer.OnInboundMessage(envelope);

try
{
onDispatchThread.Set(true);
}
finally
{
onDispatchThread.Set(false);
}
}
}

Expand Down Expand Up @@ -373,7 +330,7 @@ public void Send(NmsMessageProducer producer, IDestination destination, IMessage

internal void EnqueueForDispatch(NmsMessageConsumer.MessageDeliveryTask task)
{
dispatcher.Post(task);
dispatcher?.Post(task);
}

public NmsMessageConsumer GetConsumer(Id consumerId)
Expand Down Expand Up @@ -438,6 +395,8 @@ public void Start()
{
if (started.CompareAndSet(false, true))
{
dispatcher = new SessionDispatcher();;

foreach (NmsMessageConsumer consumer in consumers.Values.ToArray())
{
consumer.Start();
Expand All @@ -447,7 +406,7 @@ public void Start()

internal void CheckIsOnDeliveryThread()
{
if (dispatchThreadId == Thread.CurrentThread.ManagedThreadId)
if (dispatcher != null && dispatcher.IsOnDeliveryThread())
{
throw new IllegalStateException("Illegal invocation from MessageListener callback");
}
Expand Down Expand Up @@ -475,5 +434,19 @@ public async Task OnConnectionRecovered(IProvider provider)
await consumer.OnConnectionRecovered(provider).ConfigureAwait(false);
}
}

public void Stop()
{
if (started.CompareAndSet(true, false))
{
dispatcher.Stop();
dispatcher = null;

foreach (NmsMessageConsumer consumer in consumers.Values)
{
consumer.Stop();
}
}
}
}
}
46 changes: 46 additions & 0 deletions src/NMS.AMQP/SessionDispatcher.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
using System.Threading;
using System.Threading.Tasks.Dataflow;

namespace Apache.NMS.AMQP
{
internal class SessionDispatcher
{
private ActionBlock<NmsMessageConsumer.MessageDeliveryTask> actionBlock;
private int dispatchThreadId;
private readonly CancellationTokenSource cts;

public SessionDispatcher()
{
cts = new CancellationTokenSource();
actionBlock = new ActionBlock<NmsMessageConsumer.MessageDeliveryTask>(HandleTask, new ExecutionDataflowBlockOptions
{
EnsureOrdered = true,
MaxDegreeOfParallelism = 1,
CancellationToken = cts.Token
});
}

public void Post(NmsMessageConsumer.MessageDeliveryTask task) => actionBlock.Post(task);

public bool IsOnDeliveryThread() => dispatchThreadId == Thread.CurrentThread.ManagedThreadId;

private void HandleTask(NmsMessageConsumer.MessageDeliveryTask messageDeliveryTask)
{
try
{
dispatchThreadId = Thread.CurrentThread.ManagedThreadId;
messageDeliveryTask.DeliverNextPending();
}
finally
{
dispatchThreadId = -1;
}
}

public void Stop()
{
cts.Cancel();
cts.Dispose();
}
}
}
84 changes: 75 additions & 9 deletions test/Integration/ConnectionIntegrationTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Threading.Tasks;
using Apache.NMS;
using Apache.NMS.AMQP;
using Apache.NMS.Util;
using NMS.AMQP.Test.TestAmqp;
using NUnit.Framework;

Expand Down Expand Up @@ -99,10 +100,7 @@ public void TestRemotelyEndConnectionListenerInvoked()
IConnection connection = factory.CreateConnection(User, Password);

bool done = false;
connection.ExceptionListener += exception =>
{
done = true;
};
connection.ExceptionListener += exception => { done = true; };
connection.Start();

// Explicitly close the connection with an error
Expand All @@ -129,32 +127,100 @@ public void TestRemotelyEndConnectionWithSessionWithConsumer()
// Explicitly close the connection with an error
testAmqpPeer.Close();

Assert.That(() => ((NmsConnection)connection).IsConnected, Is.False.After(200, 50), "Connection never closed");
Assert.That(() => ((NmsConnection)connection).IsClosed, Is.True.After(200, 50), "Connection never closed");
Assert.That(() => ((NmsConnection) connection).IsConnected, Is.False.After(200, 50), "Connection never closed");
Assert.That(() => ((NmsConnection) connection).IsClosed, Is.True.After(200, 50), "Connection never closed");

try
{
connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
Assert.Fail("Expected ISE to be thrown due to being closed");
}
catch (IllegalStateException) { }
catch (IllegalStateException)
{
}

// Verify the session is now marked closed
try
{
session.CreateConsumer(queue);
Assert.Fail("Expected ISE to be thrown due to being closed");
}
catch (IllegalStateException) { }
catch (IllegalStateException)
{
}

// Verify the consumer is now marked closed
try
{
consumer.Listener += message => { };
Assert.Fail("Expected ISE to be thrown due to being closed");
}
catch (IllegalStateException) { }
catch (IllegalStateException)
{
}

// Try closing them explicitly, should effectively no-op in client.
// The test peer will throw during close if it sends anything.
consumer.Close();
session.Close();
connection.Close();
}
}

[Test]
public void TestConnectionStartStop()
{
using (TestAmqpPeer testAmqpPeer = new TestAmqpPeer(Address, User, Password))
{
testAmqpPeer.Open();
testAmqpPeer.RegisterMessageSource("myQueue");

NmsConnectionFactory factory = new NmsConnectionFactory(Address);
IConnection connection = factory.CreateConnection(User, Password);

ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
IQueue queue = session.GetQueue("myQueue");
var consumer = session.CreateConsumer(queue);

CountDownLatch firstBatch = new CountDownLatch(5);
CountDownLatch secondBatch = new CountDownLatch(5);

consumer.Listener += message =>
{
if (firstBatch.Remaining > 0)
firstBatch.countDown();
else
secondBatch.countDown();
};

// send first batch of messages
for (int i = 0; i < 5; i++)
{
testAmqpPeer.SendMessage("myQueue", $"message{i.ToString()}");
}

connection.Start();

Assert.True(firstBatch.@await(TimeSpan.FromMilliseconds(1000)));

// stop the connection, consumers shouldn't receive any more messages
connection.Stop();

// send second batch of messages
for (int i = 5; i < 10; i++)
{
testAmqpPeer.SendMessage("myQueue", $"message{i.ToString()}");
}

// No messages should arrive to consumer as connection has been stopped
Assert.False(secondBatch.@await(TimeSpan.FromMilliseconds(1000)), "Message arrived despite the fact, that the connection was stopped.");

// restart the connection
connection.Start();

// The second batch of messages should be delivered
Assert.True(secondBatch.@await(TimeSpan.FromMilliseconds(1000)), "No messages arrived.");

// Try closing them explicitly, should effectively no-op in client.
// The test peer will throw during close if it sends anything.
consumer.Close();
Expand Down
7 changes: 7 additions & 0 deletions test/TestAmqp/TestAmqpPeer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ public void Dispose()
Close();
}

public void RegisterMessageSource(string address)
{
var messageSource = new TestMessageSource();
containerHost.RegisterMessageSource(address, messageSource);
messageSources.Add(address, messageSource);
}

public void SendMessage(string address, string payload)
{
if (messageSources.TryGetValue(address, out var messageSource))
Expand Down

0 comments on commit 1aef201

Please sign in to comment.