From 3295551e3c2e17e047c60d19ab73a081b2d12ca7 Mon Sep 17 00:00:00 2001 From: Brian Yule Date: Mon, 16 Oct 2017 21:25:54 +0100 Subject: [PATCH 1/6] Batch Publish Batch publish allows sending multiple messages in one stream on the socket. Sending in batches improves performance by reducing the number of TCP/IP messages sent and TCP/IP acknowledgments received. This change compiles the commands for all publish messages into a memory buffer that is posted to the socket as a single stream. Closing the socket/model/connection before the buffer has completed sending does not guarantee message delivery. --- .../src/client/impl/Command.cs | 28 ++++++++ .../src/client/impl/ISession.cs | 4 +- .../src/client/impl/ModelBase.cs | 65 ++++++++++++++++++- .../src/client/impl/SessionBase.cs | 7 +- 4 files changed, 101 insertions(+), 3 deletions(-) diff --git a/projects/client/RabbitMQ.Client/src/client/impl/Command.cs b/projects/client/RabbitMQ.Client/src/client/impl/Command.cs index 799b6fa018..5392077fcb 100644 --- a/projects/client/RabbitMQ.Client/src/client/impl/Command.cs +++ b/projects/client/RabbitMQ.Client/src/client/impl/Command.cs @@ -140,6 +140,8 @@ public void Transmit(int channelNumber, Connection connection) } } + + public void TransmitAsSingleFrame(int channelNumber, Connection connection) { connection.WriteFrame(new MethodOutboundFrame(channelNumber, Method)); @@ -166,5 +168,31 @@ public void TransmitAsFrameSet(int channelNumber, Connection connection) connection.WriteFrameSet(frames); } + + + public static IList CalculateFrames(int channelNumber, Connection connection, IEnumerable commands) + { + List frames = new List(); + + foreach (var cmd in commands) + { + frames.Add(new MethodOutboundFrame(channelNumber, cmd.Method)); + if (cmd.Method.HasContent) + { + var body = cmd.Body;// var body = ConsolidateBody(); // Cache, since the property is compiled. + + frames.Add(new HeaderOutboundFrame(channelNumber, cmd.Header, body.Length)); + var frameMax = (int)Math.Min(int.MaxValue, connection.FrameMax); + var bodyPayloadMax = (frameMax == 0) ? body.Length : frameMax - EmptyFrameSize; + for (int offset = 0; offset < body.Length; offset += bodyPayloadMax) + { + var remaining = body.Length - offset; + var count = (remaining < bodyPayloadMax) ? remaining : bodyPayloadMax; + frames.Add(new BodySegmentOutboundFrame(channelNumber, body, offset, count)); + } + } + } + return frames; + } } } diff --git a/projects/client/RabbitMQ.Client/src/client/impl/ISession.cs b/projects/client/RabbitMQ.Client/src/client/impl/ISession.cs index cc517e4e56..4392e92cef 100644 --- a/projects/client/RabbitMQ.Client/src/client/impl/ISession.cs +++ b/projects/client/RabbitMQ.Client/src/client/impl/ISession.cs @@ -39,7 +39,8 @@ //--------------------------------------------------------------------------- using System; - +using System.Collections.Generic; + namespace RabbitMQ.Client.Impl { public interface ISession @@ -79,5 +80,6 @@ public interface ISession void HandleFrame(InboundFrame frame); void Notify(); void Transmit(Command cmd); + void Transmit(IEnumerable cmd); } } diff --git a/projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs b/projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs index 0b66a4cb55..41f0e4a9e4 100644 --- a/projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs +++ b/projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs @@ -472,6 +472,19 @@ public void ModelSend(MethodBase method, ContentHeaderBase header, byte[] body) Session.Transmit(new Command(method, header, body)); } } + public void ModelSend(MethodBase method, IEnumerable messages) + { + if (method.HasContent) + { + m_flowControlBlock.WaitOne(); + } + List commands = new List(); + foreach (var message in messages) + { + commands.Add(new Command(method, (ContentHeaderBase)message.basicProperties, message.Body)); + } + Session.Transmit(commands); + } public virtual void OnBasicAck(BasicAckEventArgs args) { @@ -1050,6 +1063,12 @@ public abstract void _Private_BasicPublish(string exchange, IBasicProperties basicProperties, byte[] body); + //public abstract void _Private_BasicBatchPublish(string exchange, + // string routingKey, + // bool mandatory, + // IEnumerable messages); + + public abstract void _Private_BasicRecover(bool requeue); public abstract void _Private_ChannelClose(ushort replyCode, @@ -1231,8 +1250,52 @@ public void BasicPublish(string exchange, mandatory, basicProperties, body); - } + } + + public void BasicBatchPublish(string exchange, + string routingKey, + bool mandatory, + IEnumerable messages) + { + foreach (var message in messages) + { + if (message.basicProperties == null) + { + message.basicProperties = CreateBasicProperties(); + } + + if (NextPublishSeqNo > 0) + { + lock (m_unconfirmedSet.SyncRoot) + { + if (!m_unconfirmedSet.Contains(NextPublishSeqNo)) + { + m_unconfirmedSet.Add(NextPublishSeqNo); + } + NextPublishSeqNo++; + } + } + } + _Private_BasicBatchPublish(exchange, + routingKey, + mandatory, + messages); + } + public void _Private_BasicBatchPublish( +string @exchange, +string @routingKey, +bool @mandatory, +//bool @immediate, +IEnumerable messages) + { + BasicPublish __req = new BasicPublish(); + __req.m_exchange = @exchange; + __req.m_routingKey = @routingKey; + __req.m_mandatory = @mandatory; + //__req.m_immediate = @immediate; + ModelSend(__req, messages); + } public abstract void BasicQos(uint prefetchSize, ushort prefetchCount, bool global); diff --git a/projects/client/RabbitMQ.Client/src/client/impl/SessionBase.cs b/projects/client/RabbitMQ.Client/src/client/impl/SessionBase.cs index 7b380137ea..0326d89a08 100644 --- a/projects/client/RabbitMQ.Client/src/client/impl/SessionBase.cs +++ b/projects/client/RabbitMQ.Client/src/client/impl/SessionBase.cs @@ -41,7 +41,8 @@ using System; using RabbitMQ.Client.Exceptions; using RabbitMQ.Client.Framing.Impl; - +using System.Collections.Generic; + namespace RabbitMQ.Client.Impl { public abstract class SessionBase : ISession @@ -199,5 +200,9 @@ public virtual void Transmit(Command cmd) // of frames within a channel. But that is fixed in socket frame handler instead, so no need to lock. cmd.Transmit(ChannelNumber, Connection); } + public virtual void Transmit(IEnumerable commands) + { + Connection.WriteFrameSet(Command.CalculateFrames(ChannelNumber, Connection, commands)); + } } } From f8f3b47d6591250c28bd1cefd1a555b31bb52172 Mon Sep 17 00:00:00 2001 From: Brian Yule Date: Mon, 16 Oct 2017 22:30:31 +0100 Subject: [PATCH 2/6] Batch Publish Missing Items Addition of API items for BatchPublish --- projects/client/RabbitMQ.Client/src/client/api/IModel.cs | 9 +++++++++ .../RabbitMQ.Client/src/client/api/IModelExtensions.cs | 4 ++++ 2 files changed, 13 insertions(+) diff --git a/projects/client/RabbitMQ.Client/src/client/api/IModel.cs b/projects/client/RabbitMQ.Client/src/client/api/IModel.cs index 6642057390..57c6f7d67a 100644 --- a/projects/client/RabbitMQ.Client/src/client/api/IModel.cs +++ b/projects/client/RabbitMQ.Client/src/client/api/IModel.cs @@ -226,6 +226,11 @@ string BasicConsume( void BasicPublish(string exchange, string routingKey, bool mandatory, IBasicProperties basicProperties, byte[] body); + [AmqpMethodDoNotImplement(null)] + void BasicBatchPublish(string exchange, string routingKey, bool mandatory, + IEnumerable messages); + + /// /// Configures QoS parameters of the Basic content-class. /// @@ -552,4 +557,8 @@ void QueueDeclareNoWait(string queue, bool durable, /// TimeSpan ContinuationTimeout { get; set; } } + public class BatchMessage{ + public byte[] Body { get; set; } + public IBasicProperties basicProperties { get; set; } + } } diff --git a/projects/client/RabbitMQ.Client/src/client/api/IModelExtensions.cs b/projects/client/RabbitMQ.Client/src/client/api/IModelExtensions.cs index 59afcc4919..d112f57b2e 100644 --- a/projects/client/RabbitMQ.Client/src/client/api/IModelExtensions.cs +++ b/projects/client/RabbitMQ.Client/src/client/api/IModelExtensions.cs @@ -103,6 +103,10 @@ public static void BasicPublish(this IModel model, string exchange, string routi { model.BasicPublish(exchange, routingKey, false, basicProperties, body); } + public static void BasicBatchPublish(this IModel model, string exchange, string routingKey, IEnumerable messages) + { + model.BasicBatchPublish(exchange, routingKey, false, messages); + } /// /// (Spec method) Convenience overload of BasicPublish. From a21fc483f5b91fc0ac2bf38eee0426e307494b92 Mon Sep 17 00:00:00 2001 From: Brian Yule Date: Mon, 16 Oct 2017 22:37:52 +0100 Subject: [PATCH 3/6] Removing commented code. --- .../src/client/impl/ModelBase.cs | 3172 ++++++++--------- 1 file changed, 1583 insertions(+), 1589 deletions(-) diff --git a/projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs b/projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs index 41f0e4a9e4..318dfb700c 100644 --- a/projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs +++ b/projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs @@ -1,1262 +1,1256 @@ -// This source code is dual-licensed under the Apache License, version -// 2.0, and the Mozilla Public License, version 1.1. -// -// The APL v2.0: -// -//--------------------------------------------------------------------------- -// Copyright (c) 2007-2016 Pivotal Software, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -//--------------------------------------------------------------------------- -// -// The MPL v1.1: -// -//--------------------------------------------------------------------------- -// The contents of this file are subject to the Mozilla Public License -// Version 1.1 (the "License"); you may not use this file except in -// compliance with the License. You may obtain a copy of the License -// at http://www.mozilla.org/MPL/ -// -// Software distributed under the License is distributed on an "AS IS" -// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -// the License for the specific language governing rights and -// limitations under the License. -// -// The Original Code is RabbitMQ. -// -// The Initial Developer of the Original Code is Pivotal Software, Inc. -// Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. -//--------------------------------------------------------------------------- - -using RabbitMQ.Client.Events; -using RabbitMQ.Client.Exceptions; -using RabbitMQ.Client.Framing; -using RabbitMQ.Client.Framing.Impl; -using RabbitMQ.Util; -using System; -using System.Collections.Generic; -using System.Diagnostics; -using System.IO; -using System.Threading; - -#if (NETFX_CORE) -using Trace = System.Diagnostics.Debug; -#endif - -namespace RabbitMQ.Client.Impl -{ - public abstract class ModelBase : IFullModel, IRecoverable - { - public readonly IDictionary m_consumers = new Dictionary(); - - ///Only used to kick-start a connection open - ///sequence. See - public BlockingCell m_connectionStartCell = null; - - private TimeSpan m_handshakeContinuationTimeout = TimeSpan.FromSeconds(10); - private TimeSpan m_continuationTimeout = TimeSpan.FromSeconds(20); - - private RpcContinuationQueue m_continuationQueue = new RpcContinuationQueue(); - private ManualResetEvent m_flowControlBlock = new ManualResetEvent(true); - - private readonly object m_eventLock = new object(); - private readonly object m_flowSendLock = new object(); - private readonly object m_shutdownLock = new object(); - - private readonly SynchronizedList m_unconfirmedSet = new SynchronizedList(); - - private EventHandler m_basicAck; - private EventHandler m_basicNack; - private EventHandler m_basicRecoverOk; - private EventHandler m_basicReturn; - private EventHandler m_callbackException; - private EventHandler m_flowControl; - private EventHandler m_modelShutdown; - - private bool m_onlyAcksReceived = true; - - private EventHandler m_recovery; - - public IConsumerDispatcher ConsumerDispatcher { get; private set; } - - public ModelBase(ISession session) - : this(session, session.Connection.ConsumerWorkService) - { } - - public ModelBase(ISession session, ConsumerWorkService workService) - { - var asyncConsumerWorkService = workService as AsyncConsumerWorkService; - if (asyncConsumerWorkService != null) - { - ConsumerDispatcher = new AsyncConsumerDispatcher(this, asyncConsumerWorkService); - } - else - { - ConsumerDispatcher = new ConcurrentConsumerDispatcher(this, workService); - } - - Initialise(session); - } - - protected void Initialise(ISession session) - { - CloseReason = null; - NextPublishSeqNo = 0; - Session = session; - Session.CommandReceived = HandleCommand; - Session.SessionShutdown += OnSessionShutdown; - } - - public TimeSpan HandshakeContinuationTimeout - { - get { return m_handshakeContinuationTimeout; } - set { m_handshakeContinuationTimeout = value; } - } - - public TimeSpan ContinuationTimeout - { - get { return m_continuationTimeout; } - set { m_continuationTimeout = value; } - } - - public event EventHandler BasicAcks - { - add - { - lock (m_eventLock) - { - m_basicAck += value; - } - } - remove - { - lock (m_eventLock) - { - m_basicAck -= value; - } - } - } - - public event EventHandler BasicNacks - { - add - { - lock (m_eventLock) - { - m_basicNack += value; - } - } - remove - { - lock (m_eventLock) - { - m_basicNack -= value; - } - } - } - - public event EventHandler BasicRecoverOk - { - add - { - lock (m_eventLock) - { - m_basicRecoverOk += value; - } - } - remove - { - lock (m_eventLock) - { - m_basicRecoverOk -= value; - } - } - } - - public event EventHandler BasicReturn - { - add - { - lock (m_eventLock) - { - m_basicReturn += value; - } - } - remove - { - lock (m_eventLock) - { - m_basicReturn -= value; - } - } - } - - public event EventHandler CallbackException - { - add - { - lock (m_eventLock) - { - m_callbackException += value; - } - } - remove - { - lock (m_eventLock) - { - m_callbackException -= value; - } - } - } - - public event EventHandler FlowControl - { - add - { - lock (m_eventLock) - { - m_flowControl += value; - } - } - remove - { - lock (m_eventLock) - { - m_flowControl -= value; - } - } - } - - public event EventHandler ModelShutdown - { - add - { - bool ok = false; - if (CloseReason == null) - { - lock (m_shutdownLock) - { - if (CloseReason == null) - { - m_modelShutdown += value; - ok = true; - } - } - } - if (!ok) - { - value(this, CloseReason); - } - } - remove - { - lock (m_shutdownLock) - { - m_modelShutdown -= value; - } - } - } - - public event EventHandler Recovery - { - add - { - lock (m_eventLock) - { - m_recovery += value; - } - } - remove - { - lock (m_eventLock) - { - m_recovery -= value; - } - } - } - - public int ChannelNumber - { - get { return ((Session)Session).ChannelNumber; } - } - - public ShutdownEventArgs CloseReason { get; private set; } - - public IBasicConsumer DefaultConsumer { get; set; } - - public bool IsClosed - { - get { return !IsOpen; } - } - - public bool IsOpen - { - get { return CloseReason == null; } - } - - public ulong NextPublishSeqNo { get; private set; } - - public ISession Session { get; private set; } - - public void Close(ushort replyCode, string replyText, bool abort) - { - Close(new ShutdownEventArgs(ShutdownInitiator.Application, - replyCode, replyText), - abort); - } - - public void Close(ShutdownEventArgs reason, bool abort) - { - var k = new ShutdownContinuation(); - ModelShutdown += k.OnConnectionShutdown; - - try - { - ConsumerDispatcher.Quiesce(); - if (SetCloseReason(reason)) - { - _Private_ChannelClose(reason.ReplyCode, reason.ReplyText, 0, 0); - } - k.Wait(TimeSpan.FromMilliseconds(10000)); - ConsumerDispatcher.Shutdown(this); - } - catch (AlreadyClosedException) - { - if (!abort) - { - throw; - } - } - catch (IOException) - { - if (!abort) - { - throw; - } - } - catch (Exception) - { - if (!abort) - { - throw; - } - } - } - - public string ConnectionOpen(string virtualHost, - string capabilities, - bool insist) - { - var k = new ConnectionOpenContinuation(); - Enqueue(k); - try - { - _Private_ConnectionOpen(virtualHost, capabilities, insist); - } - catch (AlreadyClosedException) - { - // let continuation throw OperationInterruptedException, - // which is a much more suitable exception before connection - // negotiation finishes - } - k.GetReply(HandshakeContinuationTimeout); - return k.m_knownHosts; - } - - public ConnectionSecureOrTune ConnectionSecureOk(byte[] response) - { - var k = new ConnectionStartRpcContinuation(); - Enqueue(k); - try - { - _Private_ConnectionSecureOk(response); - } - catch (AlreadyClosedException) - { - // let continuation throw OperationInterruptedException, - // which is a much more suitable exception before connection - // negotiation finishes - } - k.GetReply(HandshakeContinuationTimeout); - return k.m_result; - } - - public ConnectionSecureOrTune ConnectionStartOk(IDictionary clientProperties, - string mechanism, - byte[] response, - string locale) - { - var k = new ConnectionStartRpcContinuation(); - Enqueue(k); - try - { - _Private_ConnectionStartOk(clientProperties, mechanism, - response, locale); - } - catch (AlreadyClosedException) - { - // let continuation throw OperationInterruptedException, - // which is a much more suitable exception before connection - // negotiation finishes - } - k.GetReply(HandshakeContinuationTimeout); - return k.m_result; - } - - public abstract bool DispatchAsynchronous(Command cmd); - - public void Enqueue(IRpcContinuation k) - { - bool ok = false; - if (CloseReason == null) - { - lock (m_shutdownLock) - { - if (CloseReason == null) - { - m_continuationQueue.Enqueue(k); - ok = true; - } - } - } - if (!ok) - { - k.HandleModelShutdown(CloseReason); - } - } - - public void FinishClose() - { - if (CloseReason != null) - { - Session.Close(CloseReason); - } - if (m_connectionStartCell != null) - { - m_connectionStartCell.Value = null; - } - } - - public void HandleCommand(ISession session, Command cmd) - { - if (!DispatchAsynchronous(cmd))// Was asynchronous. Already processed. No need to process further. - m_continuationQueue.Next().HandleCommand(cmd); - } - - public MethodBase ModelRpc(MethodBase method, ContentHeaderBase header, byte[] body) - { - var k = new SimpleBlockingRpcContinuation(); - TransmitAndEnqueue(new Command(method, header, body), k); - return k.GetReply(this.ContinuationTimeout).Method; - } - - public void ModelSend(MethodBase method, ContentHeaderBase header, byte[] body) - { - if (method.HasContent) - { - m_flowControlBlock.WaitOne(); - Session.Transmit(new Command(method, header, body)); - } - else - { - Session.Transmit(new Command(method, header, body)); - } - } - public void ModelSend(MethodBase method, IEnumerable messages) - { - if (method.HasContent) - { - m_flowControlBlock.WaitOne(); - } - List commands = new List(); +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 1.1. +// +// The APL v2.0: +// +//--------------------------------------------------------------------------- +// Copyright (c) 2007-2016 Pivotal Software, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//--------------------------------------------------------------------------- +// +// The MPL v1.1: +// +//--------------------------------------------------------------------------- +// The contents of this file are subject to the Mozilla Public License +// Version 1.1 (the "License"); you may not use this file except in +// compliance with the License. You may obtain a copy of the License +// at http://www.mozilla.org/MPL/ +// +// Software distributed under the License is distributed on an "AS IS" +// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +// the License for the specific language governing rights and +// limitations under the License. +// +// The Original Code is RabbitMQ. +// +// The Initial Developer of the Original Code is Pivotal Software, Inc. +// Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. +//--------------------------------------------------------------------------- + +using RabbitMQ.Client.Events; +using RabbitMQ.Client.Exceptions; +using RabbitMQ.Client.Framing; +using RabbitMQ.Client.Framing.Impl; +using RabbitMQ.Util; +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.Threading; + +#if (NETFX_CORE) +using Trace = System.Diagnostics.Debug; +#endif + +namespace RabbitMQ.Client.Impl +{ + public abstract class ModelBase : IFullModel, IRecoverable + { + public readonly IDictionary m_consumers = new Dictionary(); + + ///Only used to kick-start a connection open + ///sequence. See + public BlockingCell m_connectionStartCell = null; + + private TimeSpan m_handshakeContinuationTimeout = TimeSpan.FromSeconds(10); + private TimeSpan m_continuationTimeout = TimeSpan.FromSeconds(20); + + private RpcContinuationQueue m_continuationQueue = new RpcContinuationQueue(); + private ManualResetEvent m_flowControlBlock = new ManualResetEvent(true); + + private readonly object m_eventLock = new object(); + private readonly object m_flowSendLock = new object(); + private readonly object m_shutdownLock = new object(); + + private readonly SynchronizedList m_unconfirmedSet = new SynchronizedList(); + + private EventHandler m_basicAck; + private EventHandler m_basicNack; + private EventHandler m_basicRecoverOk; + private EventHandler m_basicReturn; + private EventHandler m_callbackException; + private EventHandler m_flowControl; + private EventHandler m_modelShutdown; + + private bool m_onlyAcksReceived = true; + + private EventHandler m_recovery; + + public IConsumerDispatcher ConsumerDispatcher { get; private set; } + + public ModelBase(ISession session) + : this(session, session.Connection.ConsumerWorkService) + { } + + public ModelBase(ISession session, ConsumerWorkService workService) + { + var asyncConsumerWorkService = workService as AsyncConsumerWorkService; + if (asyncConsumerWorkService != null) + { + ConsumerDispatcher = new AsyncConsumerDispatcher(this, asyncConsumerWorkService); + } + else + { + ConsumerDispatcher = new ConcurrentConsumerDispatcher(this, workService); + } + + Initialise(session); + } + + protected void Initialise(ISession session) + { + CloseReason = null; + NextPublishSeqNo = 0; + Session = session; + Session.CommandReceived = HandleCommand; + Session.SessionShutdown += OnSessionShutdown; + } + + public TimeSpan HandshakeContinuationTimeout + { + get { return m_handshakeContinuationTimeout; } + set { m_handshakeContinuationTimeout = value; } + } + + public TimeSpan ContinuationTimeout + { + get { return m_continuationTimeout; } + set { m_continuationTimeout = value; } + } + + public event EventHandler BasicAcks + { + add + { + lock (m_eventLock) + { + m_basicAck += value; + } + } + remove + { + lock (m_eventLock) + { + m_basicAck -= value; + } + } + } + + public event EventHandler BasicNacks + { + add + { + lock (m_eventLock) + { + m_basicNack += value; + } + } + remove + { + lock (m_eventLock) + { + m_basicNack -= value; + } + } + } + + public event EventHandler BasicRecoverOk + { + add + { + lock (m_eventLock) + { + m_basicRecoverOk += value; + } + } + remove + { + lock (m_eventLock) + { + m_basicRecoverOk -= value; + } + } + } + + public event EventHandler BasicReturn + { + add + { + lock (m_eventLock) + { + m_basicReturn += value; + } + } + remove + { + lock (m_eventLock) + { + m_basicReturn -= value; + } + } + } + + public event EventHandler CallbackException + { + add + { + lock (m_eventLock) + { + m_callbackException += value; + } + } + remove + { + lock (m_eventLock) + { + m_callbackException -= value; + } + } + } + + public event EventHandler FlowControl + { + add + { + lock (m_eventLock) + { + m_flowControl += value; + } + } + remove + { + lock (m_eventLock) + { + m_flowControl -= value; + } + } + } + + public event EventHandler ModelShutdown + { + add + { + bool ok = false; + if (CloseReason == null) + { + lock (m_shutdownLock) + { + if (CloseReason == null) + { + m_modelShutdown += value; + ok = true; + } + } + } + if (!ok) + { + value(this, CloseReason); + } + } + remove + { + lock (m_shutdownLock) + { + m_modelShutdown -= value; + } + } + } + + public event EventHandler Recovery + { + add + { + lock (m_eventLock) + { + m_recovery += value; + } + } + remove + { + lock (m_eventLock) + { + m_recovery -= value; + } + } + } + + public int ChannelNumber + { + get { return ((Session)Session).ChannelNumber; } + } + + public ShutdownEventArgs CloseReason { get; private set; } + + public IBasicConsumer DefaultConsumer { get; set; } + + public bool IsClosed + { + get { return !IsOpen; } + } + + public bool IsOpen + { + get { return CloseReason == null; } + } + + public ulong NextPublishSeqNo { get; private set; } + + public ISession Session { get; private set; } + + public void Close(ushort replyCode, string replyText, bool abort) + { + Close(new ShutdownEventArgs(ShutdownInitiator.Application, + replyCode, replyText), + abort); + } + + public void Close(ShutdownEventArgs reason, bool abort) + { + var k = new ShutdownContinuation(); + ModelShutdown += k.OnConnectionShutdown; + + try + { + ConsumerDispatcher.Quiesce(); + if (SetCloseReason(reason)) + { + _Private_ChannelClose(reason.ReplyCode, reason.ReplyText, 0, 0); + } + k.Wait(TimeSpan.FromMilliseconds(10000)); + ConsumerDispatcher.Shutdown(this); + } + catch (AlreadyClosedException) + { + if (!abort) + { + throw; + } + } + catch (IOException) + { + if (!abort) + { + throw; + } + } + catch (Exception) + { + if (!abort) + { + throw; + } + } + } + + public string ConnectionOpen(string virtualHost, + string capabilities, + bool insist) + { + var k = new ConnectionOpenContinuation(); + Enqueue(k); + try + { + _Private_ConnectionOpen(virtualHost, capabilities, insist); + } + catch (AlreadyClosedException) + { + // let continuation throw OperationInterruptedException, + // which is a much more suitable exception before connection + // negotiation finishes + } + k.GetReply(HandshakeContinuationTimeout); + return k.m_knownHosts; + } + + public ConnectionSecureOrTune ConnectionSecureOk(byte[] response) + { + var k = new ConnectionStartRpcContinuation(); + Enqueue(k); + try + { + _Private_ConnectionSecureOk(response); + } + catch (AlreadyClosedException) + { + // let continuation throw OperationInterruptedException, + // which is a much more suitable exception before connection + // negotiation finishes + } + k.GetReply(HandshakeContinuationTimeout); + return k.m_result; + } + + public ConnectionSecureOrTune ConnectionStartOk(IDictionary clientProperties, + string mechanism, + byte[] response, + string locale) + { + var k = new ConnectionStartRpcContinuation(); + Enqueue(k); + try + { + _Private_ConnectionStartOk(clientProperties, mechanism, + response, locale); + } + catch (AlreadyClosedException) + { + // let continuation throw OperationInterruptedException, + // which is a much more suitable exception before connection + // negotiation finishes + } + k.GetReply(HandshakeContinuationTimeout); + return k.m_result; + } + + public abstract bool DispatchAsynchronous(Command cmd); + + public void Enqueue(IRpcContinuation k) + { + bool ok = false; + if (CloseReason == null) + { + lock (m_shutdownLock) + { + if (CloseReason == null) + { + m_continuationQueue.Enqueue(k); + ok = true; + } + } + } + if (!ok) + { + k.HandleModelShutdown(CloseReason); + } + } + + public void FinishClose() + { + if (CloseReason != null) + { + Session.Close(CloseReason); + } + if (m_connectionStartCell != null) + { + m_connectionStartCell.Value = null; + } + } + + public void HandleCommand(ISession session, Command cmd) + { + if (!DispatchAsynchronous(cmd))// Was asynchronous. Already processed. No need to process further. + m_continuationQueue.Next().HandleCommand(cmd); + } + + public MethodBase ModelRpc(MethodBase method, ContentHeaderBase header, byte[] body) + { + var k = new SimpleBlockingRpcContinuation(); + TransmitAndEnqueue(new Command(method, header, body), k); + return k.GetReply(this.ContinuationTimeout).Method; + } + + public void ModelSend(MethodBase method, ContentHeaderBase header, byte[] body) + { + if (method.HasContent) + { + m_flowControlBlock.WaitOne(); + Session.Transmit(new Command(method, header, body)); + } + else + { + Session.Transmit(new Command(method, header, body)); + } + } + public void ModelSend(MethodBase method, IEnumerable messages) + { + if (method.HasContent) + { + m_flowControlBlock.WaitOne(); + } + List commands = new List(); foreach (var message in messages) { - commands.Add(new Command(method, (ContentHeaderBase)message.basicProperties, message.Body)); + commands.Add(new Command(method, (ContentHeaderBase)message.basicProperties, message.Body)); + } + Session.Transmit(commands); + } + + public virtual void OnBasicAck(BasicAckEventArgs args) + { + EventHandler handler; + lock (m_eventLock) + { + handler = m_basicAck; + } + if (handler != null) + { + foreach (EventHandler h in handler.GetInvocationList()) + { + try + { + h(this, args); + } + catch (Exception e) + { + OnCallbackException(CallbackExceptionEventArgs.Build(e, "OnBasicAck")); + } + } + } + + handleAckNack(args.DeliveryTag, args.Multiple, false); + } + + public virtual void OnBasicNack(BasicNackEventArgs args) + { + EventHandler handler; + lock (m_eventLock) + { + handler = m_basicNack; + } + if (handler != null) + { + foreach (EventHandler h in handler.GetInvocationList()) + { + try + { + h(this, args); + } + catch (Exception e) + { + OnCallbackException(CallbackExceptionEventArgs.Build(e, "OnBasicNack")); + } + } + } + + handleAckNack(args.DeliveryTag, args.Multiple, true); + } + + public virtual void OnBasicRecoverOk(EventArgs args) + { + EventHandler handler; + lock (m_eventLock) + { + handler = m_basicRecoverOk; + } + if (handler != null) + { + foreach (EventHandler h in handler.GetInvocationList()) + { + try + { + h(this, args); + } + catch (Exception e) + { + OnCallbackException(CallbackExceptionEventArgs.Build(e, "OnBasicRecover")); + } + } + } + } + + public virtual void OnBasicReturn(BasicReturnEventArgs args) + { + EventHandler handler; + lock (m_eventLock) + { + handler = m_basicReturn; + } + if (handler != null) + { + foreach (EventHandler h in handler.GetInvocationList()) + { + try + { + h(this, args); + } + catch (Exception e) + { + OnCallbackException(CallbackExceptionEventArgs.Build(e, "OnBasicReturn")); + } + } + } + } + + public virtual void OnCallbackException(CallbackExceptionEventArgs args) + { + EventHandler handler; + lock (m_eventLock) + { + handler = m_callbackException; + } + if (handler != null) + { + foreach (EventHandler h in handler.GetInvocationList()) + { + try + { + h(this, args); + } + catch + { + // Exception in + // Callback-exception-handler. That was the + // app's last chance. Swallow the exception. + // FIXME: proper logging + } + } + } + } + + public virtual void OnFlowControl(FlowControlEventArgs args) + { + EventHandler handler; + lock (m_eventLock) + { + handler = m_flowControl; + } + if (handler != null) + { + foreach (EventHandler h in handler.GetInvocationList()) + { + try + { + h(this, args); + } + catch (Exception e) + { + OnCallbackException(CallbackExceptionEventArgs.Build(e, "OnFlowControl")); + } + } + } + } + + ///Broadcasts notification of the final shutdown of the model. + /// + /// + ///Do not call anywhere other than at the end of OnSessionShutdown. + /// + /// + ///Must not be called when m_closeReason == null, because + ///otherwise there's a window when a new continuation could be + ///being enqueued at the same time as we're broadcasting the + ///shutdown event. See the definition of Enqueue() above. + /// + /// + public virtual void OnModelShutdown(ShutdownEventArgs reason) + { + m_continuationQueue.HandleModelShutdown(reason); + EventHandler handler; + lock (m_shutdownLock) + { + handler = m_modelShutdown; + m_modelShutdown = null; + } + if (handler != null) + { + foreach (EventHandler h in handler.GetInvocationList()) + { + try + { + h(this, reason); + } + catch (Exception e) + { + OnCallbackException(CallbackExceptionEventArgs.Build(e, "OnModelShutdown")); + } + } + } + lock (m_unconfirmedSet.SyncRoot) + Monitor.Pulse(m_unconfirmedSet.SyncRoot); + m_flowControlBlock.Set(); + } + + public void OnSessionShutdown(object sender, ShutdownEventArgs reason) + { + this.ConsumerDispatcher.Quiesce(); + SetCloseReason(reason); + OnModelShutdown(reason); + BroadcastShutdownToConsumers(m_consumers, reason); + this.ConsumerDispatcher.Shutdown(this); + } + + protected void BroadcastShutdownToConsumers(IDictionary cs, ShutdownEventArgs reason) + { + foreach (var c in cs) + { + this.ConsumerDispatcher.HandleModelShutdown(c.Value, reason); + } + } + + public bool SetCloseReason(ShutdownEventArgs reason) + { + if (CloseReason == null) + { + lock (m_shutdownLock) + { + if (CloseReason == null) + { + CloseReason = reason; + return true; + } + else + { + return false; + } + } + } + else + return false; + } + + public override string ToString() + { + return Session.ToString(); + } + + public void TransmitAndEnqueue(Command cmd, IRpcContinuation k) + { + Enqueue(k); + Session.Transmit(cmd); + } + + void IDisposable.Dispose() + { + Abort(); + } + + public abstract void ConnectionTuneOk(ushort channelMax, + uint frameMax, + ushort heartbeat); + + public void HandleBasicAck(ulong deliveryTag, + bool multiple) + { + var e = new BasicAckEventArgs + { + DeliveryTag = deliveryTag, + Multiple = multiple + }; + OnBasicAck(e); + } + + public void HandleBasicCancel(string consumerTag, bool nowait) + { + IBasicConsumer consumer; + lock (m_consumers) + { + consumer = m_consumers[consumerTag]; + m_consumers.Remove(consumerTag); + } + if (consumer == null) + { + consumer = DefaultConsumer; + } + ConsumerDispatcher.HandleBasicCancel(consumer, consumerTag); + } + + public void HandleBasicCancelOk(string consumerTag) + { + var k = + (BasicConsumerRpcContinuation)m_continuationQueue.Next(); +/* + Trace.Assert(k.m_consumerTag == consumerTag, string.Format( + "Consumer tag mismatch during cancel: {0} != {1}", + k.m_consumerTag, + consumerTag + )); +*/ + lock (m_consumers) + { + k.m_consumer = m_consumers[consumerTag]; + m_consumers.Remove(consumerTag); + } + ConsumerDispatcher.HandleBasicCancelOk(k.m_consumer, consumerTag); + k.HandleCommand(null); // release the continuation. + } + + public void HandleBasicConsumeOk(string consumerTag) + { + var k = + (BasicConsumerRpcContinuation)m_continuationQueue.Next(); + k.m_consumerTag = consumerTag; + lock (m_consumers) + { + m_consumers[consumerTag] = k.m_consumer; + } + ConsumerDispatcher.HandleBasicConsumeOk(k.m_consumer, consumerTag); + k.HandleCommand(null); // release the continuation. + } + + public virtual void HandleBasicDeliver(string consumerTag, + ulong deliveryTag, + bool redelivered, + string exchange, + string routingKey, + IBasicProperties basicProperties, + byte[] body) + { + IBasicConsumer consumer; + lock (m_consumers) + { + consumer = m_consumers[consumerTag]; + } + if (consumer == null) + { + if (DefaultConsumer == null) + { + throw new InvalidOperationException("Unsolicited delivery -" + + " see IModel.DefaultConsumer to handle this" + + " case."); + } + else + { + consumer = DefaultConsumer; + } + } + + ConsumerDispatcher.HandleBasicDeliver(consumer, + consumerTag, + deliveryTag, + redelivered, + exchange, + routingKey, + basicProperties, + body); + } + + public void HandleBasicGetEmpty() + { + var k = (BasicGetRpcContinuation)m_continuationQueue.Next(); + k.m_result = null; + k.HandleCommand(null); // release the continuation. + } + + public virtual void HandleBasicGetOk(ulong deliveryTag, + bool redelivered, + string exchange, + string routingKey, + uint messageCount, + IBasicProperties basicProperties, + byte[] body) + { + var k = (BasicGetRpcContinuation)m_continuationQueue.Next(); + k.m_result = new BasicGetResult(deliveryTag, + redelivered, + exchange, + routingKey, + messageCount, + basicProperties, + body); + k.HandleCommand(null); // release the continuation. + } + + public void HandleBasicNack(ulong deliveryTag, + bool multiple, + bool requeue) + { + var e = new BasicNackEventArgs(); + e.DeliveryTag = deliveryTag; + e.Multiple = multiple; + e.Requeue = requeue; + OnBasicNack(e); + } + + public void HandleBasicRecoverOk() + { + var k = (SimpleBlockingRpcContinuation)m_continuationQueue.Next(); + OnBasicRecoverOk(new EventArgs()); + k.HandleCommand(null); + } + + public void HandleBasicReturn(ushort replyCode, + string replyText, + string exchange, + string routingKey, + IBasicProperties basicProperties, + byte[] body) + { + var e = new BasicReturnEventArgs(); + e.ReplyCode = replyCode; + e.ReplyText = replyText; + e.Exchange = exchange; + e.RoutingKey = routingKey; + e.BasicProperties = basicProperties; + e.Body = body; + OnBasicReturn(e); + } + + public void HandleChannelClose(ushort replyCode, + string replyText, + ushort classId, + ushort methodId) + { + SetCloseReason(new ShutdownEventArgs(ShutdownInitiator.Peer, + replyCode, + replyText, + classId, + methodId)); + + Session.Close(CloseReason, false); + try + { + _Private_ChannelCloseOk(); + } + finally + { + Session.Notify(); + } + } + + public void HandleChannelCloseOk() + { + FinishClose(); + } + + public void HandleChannelFlow(bool active) + { + if (active) + { + m_flowControlBlock.Set(); + _Private_ChannelFlowOk(active); + } + else + { + m_flowControlBlock.Reset(); + _Private_ChannelFlowOk(active); + } + OnFlowControl(new FlowControlEventArgs(active)); + } + + public void HandleConnectionBlocked(string reason) + { + var cb = ((Connection)Session.Connection); + + cb.HandleConnectionBlocked(reason); + } + + public void HandleConnectionClose(ushort replyCode, + string replyText, + ushort classId, + ushort methodId) + { + var reason = new ShutdownEventArgs(ShutdownInitiator.Peer, + replyCode, + replyText, + classId, + methodId); + try + { + ((Connection)Session.Connection).InternalClose(reason); + _Private_ConnectionCloseOk(); + SetCloseReason((Session.Connection).CloseReason); + } + catch (IOException) + { + // Ignored. We're only trying to be polite by sending + // the close-ok, after all. + } + catch (AlreadyClosedException) + { + // Ignored. We're only trying to be polite by sending + // the close-ok, after all. + } + } + + public void HandleConnectionOpenOk(string knownHosts) + { + var k = (ConnectionOpenContinuation)m_continuationQueue.Next(); + k.m_redirect = false; + k.m_host = null; + k.m_knownHosts = knownHosts; + k.HandleCommand(null); // release the continuation. + } + + public void HandleConnectionSecure(byte[] challenge) + { + var k = (ConnectionStartRpcContinuation)m_continuationQueue.Next(); + k.m_result = new ConnectionSecureOrTune + { + m_challenge = challenge + }; + k.HandleCommand(null); // release the continuation. + } + + public void HandleConnectionStart(byte versionMajor, + byte versionMinor, + IDictionary serverProperties, + byte[] mechanisms, + byte[] locales) + { + if (m_connectionStartCell == null) + { + var reason = + new ShutdownEventArgs(ShutdownInitiator.Library, + Constants.CommandInvalid, + "Unexpected Connection.Start"); + ((Connection)Session.Connection).Close(reason); + } + var details = new ConnectionStartDetails + { + m_versionMajor = versionMajor, + m_versionMinor = versionMinor, + m_serverProperties = serverProperties, + m_mechanisms = mechanisms, + m_locales = locales + }; + m_connectionStartCell.Value = details; + m_connectionStartCell = null; + } + + ///Handle incoming Connection.Tune + ///methods. + public void HandleConnectionTune(ushort channelMax, + uint frameMax, + ushort heartbeat) + { + var k = (ConnectionStartRpcContinuation)m_continuationQueue.Next(); + k.m_result = new ConnectionSecureOrTune + { + m_tuneDetails = + { + m_channelMax = channelMax, + m_frameMax = frameMax, + m_heartbeat = heartbeat + } + }; + k.HandleCommand(null); // release the continuation. + } + + public void HandleConnectionUnblocked() + { + var cb = ((Connection)Session.Connection); + + cb.HandleConnectionUnblocked(); + } + + public void HandleQueueDeclareOk(string queue, + uint messageCount, + uint consumerCount) + { + var k = (QueueDeclareRpcContinuation)m_continuationQueue.Next(); + k.m_result = new QueueDeclareOk(queue, messageCount, consumerCount); + k.HandleCommand(null); // release the continuation. + } + + public abstract void _Private_BasicCancel(string consumerTag, + bool nowait); + + public abstract void _Private_BasicConsume(string queue, + string consumerTag, + bool noLocal, + bool autoAck, + bool exclusive, + bool nowait, + IDictionary arguments); + + public abstract void _Private_BasicGet(string queue, + bool autoAck); + + public abstract void _Private_BasicPublish(string exchange, + string routingKey, + bool mandatory, + IBasicProperties basicProperties, + byte[] body); + + public abstract void _Private_BasicRecover(bool requeue); + + public abstract void _Private_ChannelClose(ushort replyCode, + string replyText, + ushort classId, + ushort methodId); + + public abstract void _Private_ChannelCloseOk(); + + public abstract void _Private_ChannelFlowOk(bool active); + + public abstract void _Private_ChannelOpen(string outOfBand); + + public abstract void _Private_ConfirmSelect(bool nowait); + + public abstract void _Private_ConnectionClose(ushort replyCode, + string replyText, + ushort classId, + ushort methodId); + + public abstract void _Private_ConnectionCloseOk(); + + public abstract void _Private_ConnectionOpen(string virtualHost, + string capabilities, + bool insist); + + public abstract void _Private_ConnectionSecureOk(byte[] response); + + public abstract void _Private_ConnectionStartOk(IDictionary clientProperties, + string mechanism, + byte[] response, + string locale); + + public abstract void _Private_ExchangeBind(string destination, + string source, + string routingKey, + bool nowait, + IDictionary arguments); + + public abstract void _Private_ExchangeDeclare(string exchange, + string type, + bool passive, + bool durable, + bool autoDelete, + bool @internal, + bool nowait, + IDictionary arguments); + + public abstract void _Private_ExchangeDelete(string exchange, + bool ifUnused, + bool nowait); + + public abstract void _Private_ExchangeUnbind(string destination, + string source, + string routingKey, + bool nowait, + IDictionary arguments); + + public abstract void _Private_QueueBind(string queue, + string exchange, + string routingKey, + bool nowait, + IDictionary arguments); + + public abstract void _Private_QueueDeclare(string queue, + bool passive, + bool durable, + bool exclusive, + bool autoDelete, + bool nowait, + IDictionary arguments); + + public abstract uint _Private_QueueDelete(string queue, + bool ifUnused, + bool ifEmpty, + bool nowait); + + public abstract uint _Private_QueuePurge(string queue, + bool nowait); + + public void Abort() + { + Abort(Constants.ReplySuccess, "Goodbye"); + } + + public void Abort(ushort replyCode, string replyText) + { + Close(replyCode, replyText, true); + } + + public abstract void BasicAck(ulong deliveryTag, bool multiple); + + public void BasicCancel(string consumerTag) + { + var k = new BasicConsumerRpcContinuation { m_consumerTag = consumerTag }; + + Enqueue(k); + + _Private_BasicCancel(consumerTag, false); + k.GetReply(this.ContinuationTimeout); + lock (m_consumers) + { + m_consumers.Remove(consumerTag); + } + + ModelShutdown -= k.m_consumer.HandleModelShutdown; + } + + public string BasicConsume(string queue, + bool autoAck, + string consumerTag, + bool noLocal, + bool exclusive, + IDictionary arguments, + IBasicConsumer consumer) + { + // TODO: Replace with flag + var asyncDispatcher = ConsumerDispatcher as AsyncConsumerDispatcher; + if (asyncDispatcher != null) + { + var asyncConsumer = consumer as IAsyncBasicConsumer; + if (asyncConsumer == null) + { + // TODO: Friendly message + throw new InvalidOperationException("In the async mode you have to use an async consumer"); + } + } + + var k = new BasicConsumerRpcContinuation { m_consumer = consumer }; + + Enqueue(k); + // Non-nowait. We have an unconventional means of getting + // the RPC response, but a response is still expected. + _Private_BasicConsume(queue, consumerTag, noLocal, autoAck, exclusive, + /*nowait:*/ false, arguments); + k.GetReply(this.ContinuationTimeout); + string actualConsumerTag = k.m_consumerTag; + + return actualConsumerTag; + } + + public BasicGetResult BasicGet(string queue, + bool autoAck) + { + var k = new BasicGetRpcContinuation(); + Enqueue(k); + _Private_BasicGet(queue, autoAck); + k.GetReply(this.ContinuationTimeout); + return k.m_result; + } + + public abstract void BasicNack(ulong deliveryTag, + bool multiple, + bool requeue); + + public void BasicPublish(string exchange, + string routingKey, + bool mandatory, + IBasicProperties basicProperties, + byte[] body) + { + if (basicProperties == null) + { + basicProperties = CreateBasicProperties(); + } + if (NextPublishSeqNo > 0) + { + lock (m_unconfirmedSet.SyncRoot) + { + if (!m_unconfirmedSet.Contains(NextPublishSeqNo)) + { + m_unconfirmedSet.Add(NextPublishSeqNo); + } + NextPublishSeqNo++; + } } - Session.Transmit(commands); - } - - public virtual void OnBasicAck(BasicAckEventArgs args) - { - EventHandler handler; - lock (m_eventLock) - { - handler = m_basicAck; - } - if (handler != null) - { - foreach (EventHandler h in handler.GetInvocationList()) - { - try - { - h(this, args); - } - catch (Exception e) - { - OnCallbackException(CallbackExceptionEventArgs.Build(e, "OnBasicAck")); - } - } - } - - handleAckNack(args.DeliveryTag, args.Multiple, false); - } - - public virtual void OnBasicNack(BasicNackEventArgs args) - { - EventHandler handler; - lock (m_eventLock) - { - handler = m_basicNack; - } - if (handler != null) - { - foreach (EventHandler h in handler.GetInvocationList()) - { - try - { - h(this, args); - } - catch (Exception e) - { - OnCallbackException(CallbackExceptionEventArgs.Build(e, "OnBasicNack")); - } - } - } - - handleAckNack(args.DeliveryTag, args.Multiple, true); - } - - public virtual void OnBasicRecoverOk(EventArgs args) - { - EventHandler handler; - lock (m_eventLock) - { - handler = m_basicRecoverOk; - } - if (handler != null) - { - foreach (EventHandler h in handler.GetInvocationList()) - { - try - { - h(this, args); - } - catch (Exception e) - { - OnCallbackException(CallbackExceptionEventArgs.Build(e, "OnBasicRecover")); - } - } - } - } - - public virtual void OnBasicReturn(BasicReturnEventArgs args) - { - EventHandler handler; - lock (m_eventLock) - { - handler = m_basicReturn; - } - if (handler != null) - { - foreach (EventHandler h in handler.GetInvocationList()) - { - try - { - h(this, args); - } - catch (Exception e) - { - OnCallbackException(CallbackExceptionEventArgs.Build(e, "OnBasicReturn")); - } - } - } - } - - public virtual void OnCallbackException(CallbackExceptionEventArgs args) - { - EventHandler handler; - lock (m_eventLock) - { - handler = m_callbackException; - } - if (handler != null) - { - foreach (EventHandler h in handler.GetInvocationList()) - { - try - { - h(this, args); - } - catch - { - // Exception in - // Callback-exception-handler. That was the - // app's last chance. Swallow the exception. - // FIXME: proper logging - } - } - } - } - - public virtual void OnFlowControl(FlowControlEventArgs args) - { - EventHandler handler; - lock (m_eventLock) - { - handler = m_flowControl; - } - if (handler != null) - { - foreach (EventHandler h in handler.GetInvocationList()) - { - try - { - h(this, args); - } - catch (Exception e) - { - OnCallbackException(CallbackExceptionEventArgs.Build(e, "OnFlowControl")); - } - } - } - } - - ///Broadcasts notification of the final shutdown of the model. - /// - /// - ///Do not call anywhere other than at the end of OnSessionShutdown. - /// - /// - ///Must not be called when m_closeReason == null, because - ///otherwise there's a window when a new continuation could be - ///being enqueued at the same time as we're broadcasting the - ///shutdown event. See the definition of Enqueue() above. - /// - /// - public virtual void OnModelShutdown(ShutdownEventArgs reason) - { - m_continuationQueue.HandleModelShutdown(reason); - EventHandler handler; - lock (m_shutdownLock) - { - handler = m_modelShutdown; - m_modelShutdown = null; - } - if (handler != null) - { - foreach (EventHandler h in handler.GetInvocationList()) - { - try - { - h(this, reason); - } - catch (Exception e) - { - OnCallbackException(CallbackExceptionEventArgs.Build(e, "OnModelShutdown")); - } - } - } - lock (m_unconfirmedSet.SyncRoot) - Monitor.Pulse(m_unconfirmedSet.SyncRoot); - m_flowControlBlock.Set(); - } - - public void OnSessionShutdown(object sender, ShutdownEventArgs reason) - { - this.ConsumerDispatcher.Quiesce(); - SetCloseReason(reason); - OnModelShutdown(reason); - BroadcastShutdownToConsumers(m_consumers, reason); - this.ConsumerDispatcher.Shutdown(this); - } - - protected void BroadcastShutdownToConsumers(IDictionary cs, ShutdownEventArgs reason) - { - foreach (var c in cs) - { - this.ConsumerDispatcher.HandleModelShutdown(c.Value, reason); - } - } - - public bool SetCloseReason(ShutdownEventArgs reason) - { - if (CloseReason == null) - { - lock (m_shutdownLock) - { - if (CloseReason == null) - { - CloseReason = reason; - return true; - } - else - { - return false; - } - } - } - else - return false; - } - - public override string ToString() - { - return Session.ToString(); - } - - public void TransmitAndEnqueue(Command cmd, IRpcContinuation k) - { - Enqueue(k); - Session.Transmit(cmd); - } - - void IDisposable.Dispose() - { - Abort(); - } - - public abstract void ConnectionTuneOk(ushort channelMax, - uint frameMax, - ushort heartbeat); - - public void HandleBasicAck(ulong deliveryTag, - bool multiple) - { - var e = new BasicAckEventArgs - { - DeliveryTag = deliveryTag, - Multiple = multiple - }; - OnBasicAck(e); - } - - public void HandleBasicCancel(string consumerTag, bool nowait) - { - IBasicConsumer consumer; - lock (m_consumers) - { - consumer = m_consumers[consumerTag]; - m_consumers.Remove(consumerTag); - } - if (consumer == null) - { - consumer = DefaultConsumer; - } - ConsumerDispatcher.HandleBasicCancel(consumer, consumerTag); - } - - public void HandleBasicCancelOk(string consumerTag) - { - var k = - (BasicConsumerRpcContinuation)m_continuationQueue.Next(); -/* - Trace.Assert(k.m_consumerTag == consumerTag, string.Format( - "Consumer tag mismatch during cancel: {0} != {1}", - k.m_consumerTag, - consumerTag - )); -*/ - lock (m_consumers) - { - k.m_consumer = m_consumers[consumerTag]; - m_consumers.Remove(consumerTag); - } - ConsumerDispatcher.HandleBasicCancelOk(k.m_consumer, consumerTag); - k.HandleCommand(null); // release the continuation. - } - - public void HandleBasicConsumeOk(string consumerTag) - { - var k = - (BasicConsumerRpcContinuation)m_continuationQueue.Next(); - k.m_consumerTag = consumerTag; - lock (m_consumers) - { - m_consumers[consumerTag] = k.m_consumer; - } - ConsumerDispatcher.HandleBasicConsumeOk(k.m_consumer, consumerTag); - k.HandleCommand(null); // release the continuation. - } - - public virtual void HandleBasicDeliver(string consumerTag, - ulong deliveryTag, - bool redelivered, - string exchange, - string routingKey, - IBasicProperties basicProperties, - byte[] body) - { - IBasicConsumer consumer; - lock (m_consumers) - { - consumer = m_consumers[consumerTag]; - } - if (consumer == null) - { - if (DefaultConsumer == null) - { - throw new InvalidOperationException("Unsolicited delivery -" + - " see IModel.DefaultConsumer to handle this" + - " case."); - } - else - { - consumer = DefaultConsumer; - } - } - - ConsumerDispatcher.HandleBasicDeliver(consumer, - consumerTag, - deliveryTag, - redelivered, - exchange, - routingKey, - basicProperties, - body); - } - - public void HandleBasicGetEmpty() - { - var k = (BasicGetRpcContinuation)m_continuationQueue.Next(); - k.m_result = null; - k.HandleCommand(null); // release the continuation. - } - - public virtual void HandleBasicGetOk(ulong deliveryTag, - bool redelivered, - string exchange, - string routingKey, - uint messageCount, - IBasicProperties basicProperties, - byte[] body) - { - var k = (BasicGetRpcContinuation)m_continuationQueue.Next(); - k.m_result = new BasicGetResult(deliveryTag, - redelivered, - exchange, - routingKey, - messageCount, - basicProperties, - body); - k.HandleCommand(null); // release the continuation. - } - - public void HandleBasicNack(ulong deliveryTag, - bool multiple, - bool requeue) - { - var e = new BasicNackEventArgs(); - e.DeliveryTag = deliveryTag; - e.Multiple = multiple; - e.Requeue = requeue; - OnBasicNack(e); - } - - public void HandleBasicRecoverOk() - { - var k = (SimpleBlockingRpcContinuation)m_continuationQueue.Next(); - OnBasicRecoverOk(new EventArgs()); - k.HandleCommand(null); - } - - public void HandleBasicReturn(ushort replyCode, - string replyText, - string exchange, - string routingKey, - IBasicProperties basicProperties, - byte[] body) - { - var e = new BasicReturnEventArgs(); - e.ReplyCode = replyCode; - e.ReplyText = replyText; - e.Exchange = exchange; - e.RoutingKey = routingKey; - e.BasicProperties = basicProperties; - e.Body = body; - OnBasicReturn(e); - } - - public void HandleChannelClose(ushort replyCode, - string replyText, - ushort classId, - ushort methodId) - { - SetCloseReason(new ShutdownEventArgs(ShutdownInitiator.Peer, - replyCode, - replyText, - classId, - methodId)); - - Session.Close(CloseReason, false); - try - { - _Private_ChannelCloseOk(); - } - finally - { - Session.Notify(); - } - } - - public void HandleChannelCloseOk() - { - FinishClose(); - } - - public void HandleChannelFlow(bool active) - { - if (active) - { - m_flowControlBlock.Set(); - _Private_ChannelFlowOk(active); - } - else - { - m_flowControlBlock.Reset(); - _Private_ChannelFlowOk(active); - } - OnFlowControl(new FlowControlEventArgs(active)); - } - - public void HandleConnectionBlocked(string reason) - { - var cb = ((Connection)Session.Connection); - - cb.HandleConnectionBlocked(reason); - } - - public void HandleConnectionClose(ushort replyCode, - string replyText, - ushort classId, - ushort methodId) - { - var reason = new ShutdownEventArgs(ShutdownInitiator.Peer, - replyCode, - replyText, - classId, - methodId); - try - { - ((Connection)Session.Connection).InternalClose(reason); - _Private_ConnectionCloseOk(); - SetCloseReason((Session.Connection).CloseReason); - } - catch (IOException) - { - // Ignored. We're only trying to be polite by sending - // the close-ok, after all. - } - catch (AlreadyClosedException) - { - // Ignored. We're only trying to be polite by sending - // the close-ok, after all. - } - } - - public void HandleConnectionOpenOk(string knownHosts) - { - var k = (ConnectionOpenContinuation)m_continuationQueue.Next(); - k.m_redirect = false; - k.m_host = null; - k.m_knownHosts = knownHosts; - k.HandleCommand(null); // release the continuation. - } - - public void HandleConnectionSecure(byte[] challenge) - { - var k = (ConnectionStartRpcContinuation)m_continuationQueue.Next(); - k.m_result = new ConnectionSecureOrTune - { - m_challenge = challenge - }; - k.HandleCommand(null); // release the continuation. - } - - public void HandleConnectionStart(byte versionMajor, - byte versionMinor, - IDictionary serverProperties, - byte[] mechanisms, - byte[] locales) - { - if (m_connectionStartCell == null) - { - var reason = - new ShutdownEventArgs(ShutdownInitiator.Library, - Constants.CommandInvalid, - "Unexpected Connection.Start"); - ((Connection)Session.Connection).Close(reason); - } - var details = new ConnectionStartDetails - { - m_versionMajor = versionMajor, - m_versionMinor = versionMinor, - m_serverProperties = serverProperties, - m_mechanisms = mechanisms, - m_locales = locales - }; - m_connectionStartCell.Value = details; - m_connectionStartCell = null; - } - - ///Handle incoming Connection.Tune - ///methods. - public void HandleConnectionTune(ushort channelMax, - uint frameMax, - ushort heartbeat) - { - var k = (ConnectionStartRpcContinuation)m_continuationQueue.Next(); - k.m_result = new ConnectionSecureOrTune - { - m_tuneDetails = - { - m_channelMax = channelMax, - m_frameMax = frameMax, - m_heartbeat = heartbeat - } - }; - k.HandleCommand(null); // release the continuation. - } - - public void HandleConnectionUnblocked() - { - var cb = ((Connection)Session.Connection); - - cb.HandleConnectionUnblocked(); - } - - public void HandleQueueDeclareOk(string queue, - uint messageCount, - uint consumerCount) - { - var k = (QueueDeclareRpcContinuation)m_continuationQueue.Next(); - k.m_result = new QueueDeclareOk(queue, messageCount, consumerCount); - k.HandleCommand(null); // release the continuation. - } - - public abstract void _Private_BasicCancel(string consumerTag, - bool nowait); - - public abstract void _Private_BasicConsume(string queue, - string consumerTag, - bool noLocal, - bool autoAck, - bool exclusive, - bool nowait, - IDictionary arguments); - - public abstract void _Private_BasicGet(string queue, - bool autoAck); - - public abstract void _Private_BasicPublish(string exchange, - string routingKey, - bool mandatory, - IBasicProperties basicProperties, - byte[] body); - - //public abstract void _Private_BasicBatchPublish(string exchange, - // string routingKey, - // bool mandatory, - // IEnumerable messages); - - - public abstract void _Private_BasicRecover(bool requeue); - - public abstract void _Private_ChannelClose(ushort replyCode, - string replyText, - ushort classId, - ushort methodId); - - public abstract void _Private_ChannelCloseOk(); - - public abstract void _Private_ChannelFlowOk(bool active); - - public abstract void _Private_ChannelOpen(string outOfBand); - - public abstract void _Private_ConfirmSelect(bool nowait); - - public abstract void _Private_ConnectionClose(ushort replyCode, - string replyText, - ushort classId, - ushort methodId); - - public abstract void _Private_ConnectionCloseOk(); - - public abstract void _Private_ConnectionOpen(string virtualHost, - string capabilities, - bool insist); - - public abstract void _Private_ConnectionSecureOk(byte[] response); - - public abstract void _Private_ConnectionStartOk(IDictionary clientProperties, - string mechanism, - byte[] response, - string locale); - - public abstract void _Private_ExchangeBind(string destination, - string source, - string routingKey, - bool nowait, - IDictionary arguments); - - public abstract void _Private_ExchangeDeclare(string exchange, - string type, - bool passive, - bool durable, - bool autoDelete, - bool @internal, - bool nowait, - IDictionary arguments); - - public abstract void _Private_ExchangeDelete(string exchange, - bool ifUnused, - bool nowait); - - public abstract void _Private_ExchangeUnbind(string destination, - string source, - string routingKey, - bool nowait, - IDictionary arguments); - - public abstract void _Private_QueueBind(string queue, - string exchange, - string routingKey, - bool nowait, - IDictionary arguments); - - public abstract void _Private_QueueDeclare(string queue, - bool passive, - bool durable, - bool exclusive, - bool autoDelete, - bool nowait, - IDictionary arguments); - - public abstract uint _Private_QueueDelete(string queue, - bool ifUnused, - bool ifEmpty, - bool nowait); - - public abstract uint _Private_QueuePurge(string queue, - bool nowait); - - public void Abort() - { - Abort(Constants.ReplySuccess, "Goodbye"); - } - - public void Abort(ushort replyCode, string replyText) - { - Close(replyCode, replyText, true); - } - - public abstract void BasicAck(ulong deliveryTag, bool multiple); - - public void BasicCancel(string consumerTag) - { - var k = new BasicConsumerRpcContinuation { m_consumerTag = consumerTag }; - - Enqueue(k); - - _Private_BasicCancel(consumerTag, false); - k.GetReply(this.ContinuationTimeout); - lock (m_consumers) - { - m_consumers.Remove(consumerTag); - } - - ModelShutdown -= k.m_consumer.HandleModelShutdown; - } - - public string BasicConsume(string queue, - bool autoAck, - string consumerTag, - bool noLocal, - bool exclusive, - IDictionary arguments, - IBasicConsumer consumer) - { - // TODO: Replace with flag - var asyncDispatcher = ConsumerDispatcher as AsyncConsumerDispatcher; - if (asyncDispatcher != null) - { - var asyncConsumer = consumer as IAsyncBasicConsumer; - if (asyncConsumer == null) - { - // TODO: Friendly message - throw new InvalidOperationException("In the async mode you have to use an async consumer"); - } - } - - var k = new BasicConsumerRpcContinuation { m_consumer = consumer }; - - Enqueue(k); - // Non-nowait. We have an unconventional means of getting - // the RPC response, but a response is still expected. - _Private_BasicConsume(queue, consumerTag, noLocal, autoAck, exclusive, - /*nowait:*/ false, arguments); - k.GetReply(this.ContinuationTimeout); - string actualConsumerTag = k.m_consumerTag; - - return actualConsumerTag; - } - - public BasicGetResult BasicGet(string queue, - bool autoAck) - { - var k = new BasicGetRpcContinuation(); - Enqueue(k); - _Private_BasicGet(queue, autoAck); - k.GetReply(this.ContinuationTimeout); - return k.m_result; - } - - public abstract void BasicNack(ulong deliveryTag, - bool multiple, - bool requeue); - - public void BasicPublish(string exchange, - string routingKey, - bool mandatory, - IBasicProperties basicProperties, - byte[] body) - { - if (basicProperties == null) - { - basicProperties = CreateBasicProperties(); - } - if (NextPublishSeqNo > 0) - { - lock (m_unconfirmedSet.SyncRoot) - { - if (!m_unconfirmedSet.Contains(NextPublishSeqNo)) - { - m_unconfirmedSet.Add(NextPublishSeqNo); - } - NextPublishSeqNo++; - } - } - _Private_BasicPublish(exchange, - routingKey, - mandatory, - basicProperties, - body); + _Private_BasicPublish(exchange, + routingKey, + mandatory, + basicProperties, + body); } public void BasicBatchPublish(string exchange, string routingKey, bool mandatory, - IEnumerable messages) - { + IEnumerable messages) + { foreach (var message in messages) { if (message.basicProperties == null) @@ -1274,13 +1268,13 @@ public void BasicBatchPublish(string exchange, } NextPublishSeqNo++; } - } - } - - _Private_BasicBatchPublish(exchange, - routingKey, - mandatory, - messages); + } + } + + _Private_BasicBatchPublish(exchange, + routingKey, + mandatory, + messages); } public void _Private_BasicBatchPublish( string @exchange, @@ -1295,334 +1289,334 @@ public void _Private_BasicBatchPublish( __req.m_mandatory = @mandatory; //__req.m_immediate = @immediate; ModelSend(__req, messages); - } - public abstract void BasicQos(uint prefetchSize, - ushort prefetchCount, - bool global); - - public void BasicRecover(bool requeue) - { - var k = new SimpleBlockingRpcContinuation(); - - Enqueue(k); - _Private_BasicRecover(requeue); - k.GetReply(this.ContinuationTimeout); - } - - public abstract void BasicRecoverAsync(bool requeue); - - public abstract void BasicReject(ulong deliveryTag, - bool requeue); - - public void Close() - { - Close(Constants.ReplySuccess, "Goodbye"); - } - - public void Close(ushort replyCode, string replyText) - { - Close(replyCode, replyText, false); - } - - public void ConfirmSelect() - { - if (NextPublishSeqNo == 0UL) - { - NextPublishSeqNo = 1; - } - _Private_ConfirmSelect(false); - } - - /////////////////////////////////////////////////////////////////////////// - - public abstract IBasicProperties CreateBasicProperties(); - - - public void ExchangeBind(string destination, - string source, - string routingKey, - IDictionary arguments) - { - _Private_ExchangeBind(destination, source, routingKey, false, arguments); - } - - public void ExchangeBindNoWait(string destination, - string source, - string routingKey, - IDictionary arguments) - { - _Private_ExchangeBind(destination, source, routingKey, true, arguments); - } - - public void ExchangeDeclare(string exchange, string type, bool durable, bool autoDelete, IDictionary arguments) - { - _Private_ExchangeDeclare(exchange, type, false, durable, autoDelete, false, false, arguments); - } - - public void ExchangeDeclareNoWait(string exchange, - string type, - bool durable, - bool autoDelete, - IDictionary arguments) - { - _Private_ExchangeDeclare(exchange, type, false, durable, autoDelete, false, true, arguments); - } - - public void ExchangeDeclarePassive(string exchange) - { - _Private_ExchangeDeclare(exchange, "", true, false, false, false, false, null); - } - - public void ExchangeDelete(string exchange, - bool ifUnused) - { - _Private_ExchangeDelete(exchange, ifUnused, false); - } - - public void ExchangeDeleteNoWait(string exchange, - bool ifUnused) - { - _Private_ExchangeDelete(exchange, ifUnused, false); - } - - public void ExchangeUnbind(string destination, - string source, - string routingKey, - IDictionary arguments) - { - _Private_ExchangeUnbind(destination, source, routingKey, false, arguments); - } - - public void ExchangeUnbindNoWait(string destination, - string source, - string routingKey, - IDictionary arguments) - { - _Private_ExchangeUnbind(destination, source, routingKey, true, arguments); - } - - public void QueueBind(string queue, - string exchange, - string routingKey, - IDictionary arguments) - { - _Private_QueueBind(queue, exchange, routingKey, false, arguments); - } - - public void QueueBindNoWait(string queue, - string exchange, - string routingKey, - IDictionary arguments) - { - _Private_QueueBind(queue, exchange, routingKey, true, arguments); - } - - public QueueDeclareOk QueueDeclare(string queue, bool durable, - bool exclusive, bool autoDelete, - IDictionary arguments) - { - return QueueDeclare(queue, false, durable, exclusive, autoDelete, arguments); - } - - public void QueueDeclareNoWait(string queue, bool durable, bool exclusive, - bool autoDelete, IDictionary arguments) - { - _Private_QueueDeclare(queue, false, durable, exclusive, autoDelete, true, arguments); - } - - public QueueDeclareOk QueueDeclarePassive(string queue) - { - return QueueDeclare(queue, true, false, false, false, null); - } - - public uint MessageCount(string queue) - { - var ok = QueueDeclarePassive(queue); - return ok.MessageCount; - } - - public uint ConsumerCount(string queue) - { - var ok = QueueDeclarePassive(queue); - return ok.ConsumerCount; - } - - public uint QueueDelete(string queue, - bool ifUnused, - bool ifEmpty) - { - return _Private_QueueDelete(queue, ifUnused, ifEmpty, false); - } - - public void QueueDeleteNoWait(string queue, - bool ifUnused, - bool ifEmpty) - { - _Private_QueueDelete(queue, ifUnused, ifEmpty, true); - } - - public uint QueuePurge(string queue) - { - return _Private_QueuePurge(queue, false); - } - - public abstract void QueueUnbind(string queue, - string exchange, - string routingKey, - IDictionary arguments); - - public abstract void TxCommit(); - - public abstract void TxRollback(); - - public abstract void TxSelect(); - - public bool WaitForConfirms(TimeSpan timeout, out bool timedOut) - { - if (NextPublishSeqNo == 0UL) - { - throw new InvalidOperationException("Confirms not selected"); - } - bool isWaitInfinite = (timeout.TotalMilliseconds == Timeout.Infinite); - Stopwatch stopwatch = Stopwatch.StartNew(); - lock (m_unconfirmedSet.SyncRoot) - { - while (true) - { - if (!IsOpen) - { - throw new AlreadyClosedException(CloseReason); - } - - if (m_unconfirmedSet.Count == 0) - { - bool aux = m_onlyAcksReceived; - m_onlyAcksReceived = true; - timedOut = false; - return aux; - } - if (isWaitInfinite) - { - Monitor.Wait(m_unconfirmedSet.SyncRoot); - } - else - { - TimeSpan elapsed = stopwatch.Elapsed; - if (elapsed > timeout || !Monitor.Wait( - m_unconfirmedSet.SyncRoot, timeout - elapsed)) - { - timedOut = true; - return true; - } - } - } - } - } - - public bool WaitForConfirms() - { - bool timedOut; - return WaitForConfirms(TimeSpan.FromMilliseconds(Timeout.Infinite), out timedOut); - } - - public bool WaitForConfirms(TimeSpan timeout) - { - bool timedOut; - return WaitForConfirms(timeout, out timedOut); - } - - public void WaitForConfirmsOrDie() - { - WaitForConfirmsOrDie(TimeSpan.FromMilliseconds(Timeout.Infinite)); - } - - public void WaitForConfirmsOrDie(TimeSpan timeout) - { - bool timedOut; - bool onlyAcksReceived = WaitForConfirms(timeout, out timedOut); - if (!onlyAcksReceived) - { - Close(new ShutdownEventArgs(ShutdownInitiator.Application, - Constants.ReplySuccess, - "Nacks Received", new IOException("nack received")), - false); - throw new IOException("Nacks Received"); - } - if (timedOut) - { - Close(new ShutdownEventArgs(ShutdownInitiator.Application, - Constants.ReplySuccess, - "Timed out waiting for acks", - new IOException("timed out waiting for acks")), - false); - throw new IOException("Timed out waiting for acks"); - } - } - - protected virtual void handleAckNack(ulong deliveryTag, bool multiple, bool isNack) - { - lock (m_unconfirmedSet.SyncRoot) - { - if (multiple) - { - for (ulong i = m_unconfirmedSet[0]; i <= deliveryTag; i++) - { - // removes potential duplicates - while (m_unconfirmedSet.Remove(i)) - { - } - } - } - else - { - while (m_unconfirmedSet.Remove(deliveryTag)) - { - } - } - m_onlyAcksReceived = m_onlyAcksReceived && !isNack; - if (m_unconfirmedSet.Count == 0) - { - Monitor.Pulse(m_unconfirmedSet.SyncRoot); - } - } - } - - private QueueDeclareOk QueueDeclare(string queue, bool passive, bool durable, bool exclusive, - bool autoDelete, IDictionary arguments) - { - var k = new QueueDeclareRpcContinuation(); - Enqueue(k); - _Private_QueueDeclare(queue, passive, durable, exclusive, autoDelete, false, arguments); - k.GetReply(this.ContinuationTimeout); - return k.m_result; - } - - public class BasicConsumerRpcContinuation : SimpleBlockingRpcContinuation - { - public IBasicConsumer m_consumer; - public string m_consumerTag; - } - - public class BasicGetRpcContinuation : SimpleBlockingRpcContinuation - { - public BasicGetResult m_result; - } - - public class ConnectionOpenContinuation : SimpleBlockingRpcContinuation - { - public string m_host; - public string m_knownHosts; - public bool m_redirect; - } - - public class ConnectionStartRpcContinuation : SimpleBlockingRpcContinuation - { - public ConnectionSecureOrTune m_result; - } - - public class QueueDeclareRpcContinuation : SimpleBlockingRpcContinuation - { - public QueueDeclareOk m_result; - } - } -} + } + public abstract void BasicQos(uint prefetchSize, + ushort prefetchCount, + bool global); + + public void BasicRecover(bool requeue) + { + var k = new SimpleBlockingRpcContinuation(); + + Enqueue(k); + _Private_BasicRecover(requeue); + k.GetReply(this.ContinuationTimeout); + } + + public abstract void BasicRecoverAsync(bool requeue); + + public abstract void BasicReject(ulong deliveryTag, + bool requeue); + + public void Close() + { + Close(Constants.ReplySuccess, "Goodbye"); + } + + public void Close(ushort replyCode, string replyText) + { + Close(replyCode, replyText, false); + } + + public void ConfirmSelect() + { + if (NextPublishSeqNo == 0UL) + { + NextPublishSeqNo = 1; + } + _Private_ConfirmSelect(false); + } + + /////////////////////////////////////////////////////////////////////////// + + public abstract IBasicProperties CreateBasicProperties(); + + + public void ExchangeBind(string destination, + string source, + string routingKey, + IDictionary arguments) + { + _Private_ExchangeBind(destination, source, routingKey, false, arguments); + } + + public void ExchangeBindNoWait(string destination, + string source, + string routingKey, + IDictionary arguments) + { + _Private_ExchangeBind(destination, source, routingKey, true, arguments); + } + + public void ExchangeDeclare(string exchange, string type, bool durable, bool autoDelete, IDictionary arguments) + { + _Private_ExchangeDeclare(exchange, type, false, durable, autoDelete, false, false, arguments); + } + + public void ExchangeDeclareNoWait(string exchange, + string type, + bool durable, + bool autoDelete, + IDictionary arguments) + { + _Private_ExchangeDeclare(exchange, type, false, durable, autoDelete, false, true, arguments); + } + + public void ExchangeDeclarePassive(string exchange) + { + _Private_ExchangeDeclare(exchange, "", true, false, false, false, false, null); + } + + public void ExchangeDelete(string exchange, + bool ifUnused) + { + _Private_ExchangeDelete(exchange, ifUnused, false); + } + + public void ExchangeDeleteNoWait(string exchange, + bool ifUnused) + { + _Private_ExchangeDelete(exchange, ifUnused, false); + } + + public void ExchangeUnbind(string destination, + string source, + string routingKey, + IDictionary arguments) + { + _Private_ExchangeUnbind(destination, source, routingKey, false, arguments); + } + + public void ExchangeUnbindNoWait(string destination, + string source, + string routingKey, + IDictionary arguments) + { + _Private_ExchangeUnbind(destination, source, routingKey, true, arguments); + } + + public void QueueBind(string queue, + string exchange, + string routingKey, + IDictionary arguments) + { + _Private_QueueBind(queue, exchange, routingKey, false, arguments); + } + + public void QueueBindNoWait(string queue, + string exchange, + string routingKey, + IDictionary arguments) + { + _Private_QueueBind(queue, exchange, routingKey, true, arguments); + } + + public QueueDeclareOk QueueDeclare(string queue, bool durable, + bool exclusive, bool autoDelete, + IDictionary arguments) + { + return QueueDeclare(queue, false, durable, exclusive, autoDelete, arguments); + } + + public void QueueDeclareNoWait(string queue, bool durable, bool exclusive, + bool autoDelete, IDictionary arguments) + { + _Private_QueueDeclare(queue, false, durable, exclusive, autoDelete, true, arguments); + } + + public QueueDeclareOk QueueDeclarePassive(string queue) + { + return QueueDeclare(queue, true, false, false, false, null); + } + + public uint MessageCount(string queue) + { + var ok = QueueDeclarePassive(queue); + return ok.MessageCount; + } + + public uint ConsumerCount(string queue) + { + var ok = QueueDeclarePassive(queue); + return ok.ConsumerCount; + } + + public uint QueueDelete(string queue, + bool ifUnused, + bool ifEmpty) + { + return _Private_QueueDelete(queue, ifUnused, ifEmpty, false); + } + + public void QueueDeleteNoWait(string queue, + bool ifUnused, + bool ifEmpty) + { + _Private_QueueDelete(queue, ifUnused, ifEmpty, true); + } + + public uint QueuePurge(string queue) + { + return _Private_QueuePurge(queue, false); + } + + public abstract void QueueUnbind(string queue, + string exchange, + string routingKey, + IDictionary arguments); + + public abstract void TxCommit(); + + public abstract void TxRollback(); + + public abstract void TxSelect(); + + public bool WaitForConfirms(TimeSpan timeout, out bool timedOut) + { + if (NextPublishSeqNo == 0UL) + { + throw new InvalidOperationException("Confirms not selected"); + } + bool isWaitInfinite = (timeout.TotalMilliseconds == Timeout.Infinite); + Stopwatch stopwatch = Stopwatch.StartNew(); + lock (m_unconfirmedSet.SyncRoot) + { + while (true) + { + if (!IsOpen) + { + throw new AlreadyClosedException(CloseReason); + } + + if (m_unconfirmedSet.Count == 0) + { + bool aux = m_onlyAcksReceived; + m_onlyAcksReceived = true; + timedOut = false; + return aux; + } + if (isWaitInfinite) + { + Monitor.Wait(m_unconfirmedSet.SyncRoot); + } + else + { + TimeSpan elapsed = stopwatch.Elapsed; + if (elapsed > timeout || !Monitor.Wait( + m_unconfirmedSet.SyncRoot, timeout - elapsed)) + { + timedOut = true; + return true; + } + } + } + } + } + + public bool WaitForConfirms() + { + bool timedOut; + return WaitForConfirms(TimeSpan.FromMilliseconds(Timeout.Infinite), out timedOut); + } + + public bool WaitForConfirms(TimeSpan timeout) + { + bool timedOut; + return WaitForConfirms(timeout, out timedOut); + } + + public void WaitForConfirmsOrDie() + { + WaitForConfirmsOrDie(TimeSpan.FromMilliseconds(Timeout.Infinite)); + } + + public void WaitForConfirmsOrDie(TimeSpan timeout) + { + bool timedOut; + bool onlyAcksReceived = WaitForConfirms(timeout, out timedOut); + if (!onlyAcksReceived) + { + Close(new ShutdownEventArgs(ShutdownInitiator.Application, + Constants.ReplySuccess, + "Nacks Received", new IOException("nack received")), + false); + throw new IOException("Nacks Received"); + } + if (timedOut) + { + Close(new ShutdownEventArgs(ShutdownInitiator.Application, + Constants.ReplySuccess, + "Timed out waiting for acks", + new IOException("timed out waiting for acks")), + false); + throw new IOException("Timed out waiting for acks"); + } + } + + protected virtual void handleAckNack(ulong deliveryTag, bool multiple, bool isNack) + { + lock (m_unconfirmedSet.SyncRoot) + { + if (multiple) + { + for (ulong i = m_unconfirmedSet[0]; i <= deliveryTag; i++) + { + // removes potential duplicates + while (m_unconfirmedSet.Remove(i)) + { + } + } + } + else + { + while (m_unconfirmedSet.Remove(deliveryTag)) + { + } + } + m_onlyAcksReceived = m_onlyAcksReceived && !isNack; + if (m_unconfirmedSet.Count == 0) + { + Monitor.Pulse(m_unconfirmedSet.SyncRoot); + } + } + } + + private QueueDeclareOk QueueDeclare(string queue, bool passive, bool durable, bool exclusive, + bool autoDelete, IDictionary arguments) + { + var k = new QueueDeclareRpcContinuation(); + Enqueue(k); + _Private_QueueDeclare(queue, passive, durable, exclusive, autoDelete, false, arguments); + k.GetReply(this.ContinuationTimeout); + return k.m_result; + } + + public class BasicConsumerRpcContinuation : SimpleBlockingRpcContinuation + { + public IBasicConsumer m_consumer; + public string m_consumerTag; + } + + public class BasicGetRpcContinuation : SimpleBlockingRpcContinuation + { + public BasicGetResult m_result; + } + + public class ConnectionOpenContinuation : SimpleBlockingRpcContinuation + { + public string m_host; + public string m_knownHosts; + public bool m_redirect; + } + + public class ConnectionStartRpcContinuation : SimpleBlockingRpcContinuation + { + public ConnectionSecureOrTune m_result; + } + + public class QueueDeclareRpcContinuation : SimpleBlockingRpcContinuation + { + public QueueDeclareOk m_result; + } + } +} From a1fe5d49cf9eef4e12827f7eacec6f60b8e30ce7 Mon Sep 17 00:00:00 2001 From: Brian Yule Date: Mon, 16 Oct 2017 22:41:29 +0100 Subject: [PATCH 4/6] Updating Indenting. --- .../RabbitMQ.Client/src/client/impl/ModelBase.cs | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs b/projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs index 318dfb700c..9cc3d7e261 100644 --- a/projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs +++ b/projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs @@ -1246,10 +1246,8 @@ public void BasicPublish(string exchange, body); } - public void BasicBatchPublish(string exchange, - string routingKey, - bool mandatory, - IEnumerable messages) + public void BasicBatchPublish(string exchange, + string routingKey, bool mandatory, IEnumerable messages) { foreach (var message in messages) { @@ -1277,11 +1275,10 @@ public void BasicBatchPublish(string exchange, messages); } public void _Private_BasicBatchPublish( -string @exchange, -string @routingKey, -bool @mandatory, -//bool @immediate, -IEnumerable messages) + string @exchange, + string @routingKey, + bool @mandatory, + IEnumerable messages) { BasicPublish __req = new BasicPublish(); __req.m_exchange = @exchange; From 4876214d6d6f38064c81a054247d98fb7653d7da Mon Sep 17 00:00:00 2001 From: Brian Yule Date: Mon, 16 Oct 2017 23:05:50 +0100 Subject: [PATCH 5/6] Updating line endings to unix --- .../src/client/impl/ModelBase.cs | 3247 +++++++++-------- 1 file changed, 1628 insertions(+), 1619 deletions(-) diff --git a/projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs b/projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs index 9cc3d7e261..0da8ac9edd 100644 --- a/projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs +++ b/projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs @@ -1,1619 +1,1628 @@ -// This source code is dual-licensed under the Apache License, version -// 2.0, and the Mozilla Public License, version 1.1. -// -// The APL v2.0: -// -//--------------------------------------------------------------------------- -// Copyright (c) 2007-2016 Pivotal Software, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -//--------------------------------------------------------------------------- -// -// The MPL v1.1: -// -//--------------------------------------------------------------------------- -// The contents of this file are subject to the Mozilla Public License -// Version 1.1 (the "License"); you may not use this file except in -// compliance with the License. You may obtain a copy of the License -// at http://www.mozilla.org/MPL/ -// -// Software distributed under the License is distributed on an "AS IS" -// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -// the License for the specific language governing rights and -// limitations under the License. -// -// The Original Code is RabbitMQ. -// -// The Initial Developer of the Original Code is Pivotal Software, Inc. -// Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. -//--------------------------------------------------------------------------- - -using RabbitMQ.Client.Events; -using RabbitMQ.Client.Exceptions; -using RabbitMQ.Client.Framing; -using RabbitMQ.Client.Framing.Impl; -using RabbitMQ.Util; -using System; -using System.Collections.Generic; -using System.Diagnostics; -using System.IO; -using System.Threading; - -#if (NETFX_CORE) -using Trace = System.Diagnostics.Debug; -#endif - -namespace RabbitMQ.Client.Impl -{ - public abstract class ModelBase : IFullModel, IRecoverable - { - public readonly IDictionary m_consumers = new Dictionary(); - - ///Only used to kick-start a connection open - ///sequence. See - public BlockingCell m_connectionStartCell = null; - - private TimeSpan m_handshakeContinuationTimeout = TimeSpan.FromSeconds(10); - private TimeSpan m_continuationTimeout = TimeSpan.FromSeconds(20); - - private RpcContinuationQueue m_continuationQueue = new RpcContinuationQueue(); - private ManualResetEvent m_flowControlBlock = new ManualResetEvent(true); - - private readonly object m_eventLock = new object(); - private readonly object m_flowSendLock = new object(); - private readonly object m_shutdownLock = new object(); - - private readonly SynchronizedList m_unconfirmedSet = new SynchronizedList(); - - private EventHandler m_basicAck; - private EventHandler m_basicNack; - private EventHandler m_basicRecoverOk; - private EventHandler m_basicReturn; - private EventHandler m_callbackException; - private EventHandler m_flowControl; - private EventHandler m_modelShutdown; - - private bool m_onlyAcksReceived = true; - - private EventHandler m_recovery; - - public IConsumerDispatcher ConsumerDispatcher { get; private set; } - - public ModelBase(ISession session) - : this(session, session.Connection.ConsumerWorkService) - { } - - public ModelBase(ISession session, ConsumerWorkService workService) - { - var asyncConsumerWorkService = workService as AsyncConsumerWorkService; - if (asyncConsumerWorkService != null) - { - ConsumerDispatcher = new AsyncConsumerDispatcher(this, asyncConsumerWorkService); - } - else - { - ConsumerDispatcher = new ConcurrentConsumerDispatcher(this, workService); - } - - Initialise(session); - } - - protected void Initialise(ISession session) - { - CloseReason = null; - NextPublishSeqNo = 0; - Session = session; - Session.CommandReceived = HandleCommand; - Session.SessionShutdown += OnSessionShutdown; - } - - public TimeSpan HandshakeContinuationTimeout - { - get { return m_handshakeContinuationTimeout; } - set { m_handshakeContinuationTimeout = value; } - } - - public TimeSpan ContinuationTimeout - { - get { return m_continuationTimeout; } - set { m_continuationTimeout = value; } - } - - public event EventHandler BasicAcks - { - add - { - lock (m_eventLock) - { - m_basicAck += value; - } - } - remove - { - lock (m_eventLock) - { - m_basicAck -= value; - } - } - } - - public event EventHandler BasicNacks - { - add - { - lock (m_eventLock) - { - m_basicNack += value; - } - } - remove - { - lock (m_eventLock) - { - m_basicNack -= value; - } - } - } - - public event EventHandler BasicRecoverOk - { - add - { - lock (m_eventLock) - { - m_basicRecoverOk += value; - } - } - remove - { - lock (m_eventLock) - { - m_basicRecoverOk -= value; - } - } - } - - public event EventHandler BasicReturn - { - add - { - lock (m_eventLock) - { - m_basicReturn += value; - } - } - remove - { - lock (m_eventLock) - { - m_basicReturn -= value; - } - } - } - - public event EventHandler CallbackException - { - add - { - lock (m_eventLock) - { - m_callbackException += value; - } - } - remove - { - lock (m_eventLock) - { - m_callbackException -= value; - } - } - } - - public event EventHandler FlowControl - { - add - { - lock (m_eventLock) - { - m_flowControl += value; - } - } - remove - { - lock (m_eventLock) - { - m_flowControl -= value; - } - } - } - - public event EventHandler ModelShutdown - { - add - { - bool ok = false; - if (CloseReason == null) - { - lock (m_shutdownLock) - { - if (CloseReason == null) - { - m_modelShutdown += value; - ok = true; - } - } - } - if (!ok) - { - value(this, CloseReason); - } - } - remove - { - lock (m_shutdownLock) - { - m_modelShutdown -= value; - } - } - } - - public event EventHandler Recovery - { - add - { - lock (m_eventLock) - { - m_recovery += value; - } - } - remove - { - lock (m_eventLock) - { - m_recovery -= value; - } - } - } - - public int ChannelNumber - { - get { return ((Session)Session).ChannelNumber; } - } - - public ShutdownEventArgs CloseReason { get; private set; } - - public IBasicConsumer DefaultConsumer { get; set; } - - public bool IsClosed - { - get { return !IsOpen; } - } - - public bool IsOpen - { - get { return CloseReason == null; } - } - - public ulong NextPublishSeqNo { get; private set; } - - public ISession Session { get; private set; } - - public void Close(ushort replyCode, string replyText, bool abort) - { - Close(new ShutdownEventArgs(ShutdownInitiator.Application, - replyCode, replyText), - abort); - } - - public void Close(ShutdownEventArgs reason, bool abort) - { - var k = new ShutdownContinuation(); - ModelShutdown += k.OnConnectionShutdown; - - try - { - ConsumerDispatcher.Quiesce(); - if (SetCloseReason(reason)) - { - _Private_ChannelClose(reason.ReplyCode, reason.ReplyText, 0, 0); - } - k.Wait(TimeSpan.FromMilliseconds(10000)); - ConsumerDispatcher.Shutdown(this); - } - catch (AlreadyClosedException) - { - if (!abort) - { - throw; - } - } - catch (IOException) - { - if (!abort) - { - throw; - } - } - catch (Exception) - { - if (!abort) - { - throw; - } - } - } - - public string ConnectionOpen(string virtualHost, - string capabilities, - bool insist) - { - var k = new ConnectionOpenContinuation(); - Enqueue(k); - try - { - _Private_ConnectionOpen(virtualHost, capabilities, insist); - } - catch (AlreadyClosedException) - { - // let continuation throw OperationInterruptedException, - // which is a much more suitable exception before connection - // negotiation finishes - } - k.GetReply(HandshakeContinuationTimeout); - return k.m_knownHosts; - } - - public ConnectionSecureOrTune ConnectionSecureOk(byte[] response) - { - var k = new ConnectionStartRpcContinuation(); - Enqueue(k); - try - { - _Private_ConnectionSecureOk(response); - } - catch (AlreadyClosedException) - { - // let continuation throw OperationInterruptedException, - // which is a much more suitable exception before connection - // negotiation finishes - } - k.GetReply(HandshakeContinuationTimeout); - return k.m_result; - } - - public ConnectionSecureOrTune ConnectionStartOk(IDictionary clientProperties, - string mechanism, - byte[] response, - string locale) - { - var k = new ConnectionStartRpcContinuation(); - Enqueue(k); - try - { - _Private_ConnectionStartOk(clientProperties, mechanism, - response, locale); - } - catch (AlreadyClosedException) - { - // let continuation throw OperationInterruptedException, - // which is a much more suitable exception before connection - // negotiation finishes - } - k.GetReply(HandshakeContinuationTimeout); - return k.m_result; - } - - public abstract bool DispatchAsynchronous(Command cmd); - - public void Enqueue(IRpcContinuation k) - { - bool ok = false; - if (CloseReason == null) - { - lock (m_shutdownLock) - { - if (CloseReason == null) - { - m_continuationQueue.Enqueue(k); - ok = true; - } - } - } - if (!ok) - { - k.HandleModelShutdown(CloseReason); - } - } - - public void FinishClose() - { - if (CloseReason != null) - { - Session.Close(CloseReason); - } - if (m_connectionStartCell != null) - { - m_connectionStartCell.Value = null; - } - } - - public void HandleCommand(ISession session, Command cmd) - { - if (!DispatchAsynchronous(cmd))// Was asynchronous. Already processed. No need to process further. - m_continuationQueue.Next().HandleCommand(cmd); - } - - public MethodBase ModelRpc(MethodBase method, ContentHeaderBase header, byte[] body) - { - var k = new SimpleBlockingRpcContinuation(); - TransmitAndEnqueue(new Command(method, header, body), k); - return k.GetReply(this.ContinuationTimeout).Method; - } - - public void ModelSend(MethodBase method, ContentHeaderBase header, byte[] body) - { - if (method.HasContent) - { - m_flowControlBlock.WaitOne(); - Session.Transmit(new Command(method, header, body)); - } - else - { - Session.Transmit(new Command(method, header, body)); - } - } - public void ModelSend(MethodBase method, IEnumerable messages) - { - if (method.HasContent) - { - m_flowControlBlock.WaitOne(); - } - List commands = new List(); - foreach (var message in messages) - { - commands.Add(new Command(method, (ContentHeaderBase)message.basicProperties, message.Body)); - } - Session.Transmit(commands); - } - - public virtual void OnBasicAck(BasicAckEventArgs args) - { - EventHandler handler; - lock (m_eventLock) - { - handler = m_basicAck; - } - if (handler != null) - { - foreach (EventHandler h in handler.GetInvocationList()) - { - try - { - h(this, args); - } - catch (Exception e) - { - OnCallbackException(CallbackExceptionEventArgs.Build(e, "OnBasicAck")); - } - } - } - - handleAckNack(args.DeliveryTag, args.Multiple, false); - } - - public virtual void OnBasicNack(BasicNackEventArgs args) - { - EventHandler handler; - lock (m_eventLock) - { - handler = m_basicNack; - } - if (handler != null) - { - foreach (EventHandler h in handler.GetInvocationList()) - { - try - { - h(this, args); - } - catch (Exception e) - { - OnCallbackException(CallbackExceptionEventArgs.Build(e, "OnBasicNack")); - } - } - } - - handleAckNack(args.DeliveryTag, args.Multiple, true); - } - - public virtual void OnBasicRecoverOk(EventArgs args) - { - EventHandler handler; - lock (m_eventLock) - { - handler = m_basicRecoverOk; - } - if (handler != null) - { - foreach (EventHandler h in handler.GetInvocationList()) - { - try - { - h(this, args); - } - catch (Exception e) - { - OnCallbackException(CallbackExceptionEventArgs.Build(e, "OnBasicRecover")); - } - } - } - } - - public virtual void OnBasicReturn(BasicReturnEventArgs args) - { - EventHandler handler; - lock (m_eventLock) - { - handler = m_basicReturn; - } - if (handler != null) - { - foreach (EventHandler h in handler.GetInvocationList()) - { - try - { - h(this, args); - } - catch (Exception e) - { - OnCallbackException(CallbackExceptionEventArgs.Build(e, "OnBasicReturn")); - } - } - } - } - - public virtual void OnCallbackException(CallbackExceptionEventArgs args) - { - EventHandler handler; - lock (m_eventLock) - { - handler = m_callbackException; - } - if (handler != null) - { - foreach (EventHandler h in handler.GetInvocationList()) - { - try - { - h(this, args); - } - catch - { - // Exception in - // Callback-exception-handler. That was the - // app's last chance. Swallow the exception. - // FIXME: proper logging - } - } - } - } - - public virtual void OnFlowControl(FlowControlEventArgs args) - { - EventHandler handler; - lock (m_eventLock) - { - handler = m_flowControl; - } - if (handler != null) - { - foreach (EventHandler h in handler.GetInvocationList()) - { - try - { - h(this, args); - } - catch (Exception e) - { - OnCallbackException(CallbackExceptionEventArgs.Build(e, "OnFlowControl")); - } - } - } - } - - ///Broadcasts notification of the final shutdown of the model. - /// - /// - ///Do not call anywhere other than at the end of OnSessionShutdown. - /// - /// - ///Must not be called when m_closeReason == null, because - ///otherwise there's a window when a new continuation could be - ///being enqueued at the same time as we're broadcasting the - ///shutdown event. See the definition of Enqueue() above. - /// - /// - public virtual void OnModelShutdown(ShutdownEventArgs reason) - { - m_continuationQueue.HandleModelShutdown(reason); - EventHandler handler; - lock (m_shutdownLock) - { - handler = m_modelShutdown; - m_modelShutdown = null; - } - if (handler != null) - { - foreach (EventHandler h in handler.GetInvocationList()) - { - try - { - h(this, reason); - } - catch (Exception e) - { - OnCallbackException(CallbackExceptionEventArgs.Build(e, "OnModelShutdown")); - } - } - } - lock (m_unconfirmedSet.SyncRoot) - Monitor.Pulse(m_unconfirmedSet.SyncRoot); - m_flowControlBlock.Set(); - } - - public void OnSessionShutdown(object sender, ShutdownEventArgs reason) - { - this.ConsumerDispatcher.Quiesce(); - SetCloseReason(reason); - OnModelShutdown(reason); - BroadcastShutdownToConsumers(m_consumers, reason); - this.ConsumerDispatcher.Shutdown(this); - } - - protected void BroadcastShutdownToConsumers(IDictionary cs, ShutdownEventArgs reason) - { - foreach (var c in cs) - { - this.ConsumerDispatcher.HandleModelShutdown(c.Value, reason); - } - } - - public bool SetCloseReason(ShutdownEventArgs reason) - { - if (CloseReason == null) - { - lock (m_shutdownLock) - { - if (CloseReason == null) - { - CloseReason = reason; - return true; - } - else - { - return false; - } - } - } - else - return false; - } - - public override string ToString() - { - return Session.ToString(); - } - - public void TransmitAndEnqueue(Command cmd, IRpcContinuation k) - { - Enqueue(k); - Session.Transmit(cmd); - } - - void IDisposable.Dispose() - { - Abort(); - } - - public abstract void ConnectionTuneOk(ushort channelMax, - uint frameMax, - ushort heartbeat); - - public void HandleBasicAck(ulong deliveryTag, - bool multiple) - { - var e = new BasicAckEventArgs - { - DeliveryTag = deliveryTag, - Multiple = multiple - }; - OnBasicAck(e); - } - - public void HandleBasicCancel(string consumerTag, bool nowait) - { - IBasicConsumer consumer; - lock (m_consumers) - { - consumer = m_consumers[consumerTag]; - m_consumers.Remove(consumerTag); - } - if (consumer == null) - { - consumer = DefaultConsumer; - } - ConsumerDispatcher.HandleBasicCancel(consumer, consumerTag); - } - - public void HandleBasicCancelOk(string consumerTag) - { - var k = - (BasicConsumerRpcContinuation)m_continuationQueue.Next(); -/* - Trace.Assert(k.m_consumerTag == consumerTag, string.Format( - "Consumer tag mismatch during cancel: {0} != {1}", - k.m_consumerTag, - consumerTag - )); -*/ - lock (m_consumers) - { - k.m_consumer = m_consumers[consumerTag]; - m_consumers.Remove(consumerTag); - } - ConsumerDispatcher.HandleBasicCancelOk(k.m_consumer, consumerTag); - k.HandleCommand(null); // release the continuation. - } - - public void HandleBasicConsumeOk(string consumerTag) - { - var k = - (BasicConsumerRpcContinuation)m_continuationQueue.Next(); - k.m_consumerTag = consumerTag; - lock (m_consumers) - { - m_consumers[consumerTag] = k.m_consumer; - } - ConsumerDispatcher.HandleBasicConsumeOk(k.m_consumer, consumerTag); - k.HandleCommand(null); // release the continuation. - } - - public virtual void HandleBasicDeliver(string consumerTag, - ulong deliveryTag, - bool redelivered, - string exchange, - string routingKey, - IBasicProperties basicProperties, - byte[] body) - { - IBasicConsumer consumer; - lock (m_consumers) - { - consumer = m_consumers[consumerTag]; - } - if (consumer == null) - { - if (DefaultConsumer == null) - { - throw new InvalidOperationException("Unsolicited delivery -" + - " see IModel.DefaultConsumer to handle this" + - " case."); - } - else - { - consumer = DefaultConsumer; - } - } - - ConsumerDispatcher.HandleBasicDeliver(consumer, - consumerTag, - deliveryTag, - redelivered, - exchange, - routingKey, - basicProperties, - body); - } - - public void HandleBasicGetEmpty() - { - var k = (BasicGetRpcContinuation)m_continuationQueue.Next(); - k.m_result = null; - k.HandleCommand(null); // release the continuation. - } - - public virtual void HandleBasicGetOk(ulong deliveryTag, - bool redelivered, - string exchange, - string routingKey, - uint messageCount, - IBasicProperties basicProperties, - byte[] body) - { - var k = (BasicGetRpcContinuation)m_continuationQueue.Next(); - k.m_result = new BasicGetResult(deliveryTag, - redelivered, - exchange, - routingKey, - messageCount, - basicProperties, - body); - k.HandleCommand(null); // release the continuation. - } - - public void HandleBasicNack(ulong deliveryTag, - bool multiple, - bool requeue) - { - var e = new BasicNackEventArgs(); - e.DeliveryTag = deliveryTag; - e.Multiple = multiple; - e.Requeue = requeue; - OnBasicNack(e); - } - - public void HandleBasicRecoverOk() - { - var k = (SimpleBlockingRpcContinuation)m_continuationQueue.Next(); - OnBasicRecoverOk(new EventArgs()); - k.HandleCommand(null); - } - - public void HandleBasicReturn(ushort replyCode, - string replyText, - string exchange, - string routingKey, - IBasicProperties basicProperties, - byte[] body) - { - var e = new BasicReturnEventArgs(); - e.ReplyCode = replyCode; - e.ReplyText = replyText; - e.Exchange = exchange; - e.RoutingKey = routingKey; - e.BasicProperties = basicProperties; - e.Body = body; - OnBasicReturn(e); - } - - public void HandleChannelClose(ushort replyCode, - string replyText, - ushort classId, - ushort methodId) - { - SetCloseReason(new ShutdownEventArgs(ShutdownInitiator.Peer, - replyCode, - replyText, - classId, - methodId)); - - Session.Close(CloseReason, false); - try - { - _Private_ChannelCloseOk(); - } - finally - { - Session.Notify(); - } - } - - public void HandleChannelCloseOk() - { - FinishClose(); - } - - public void HandleChannelFlow(bool active) - { - if (active) - { - m_flowControlBlock.Set(); - _Private_ChannelFlowOk(active); - } - else - { - m_flowControlBlock.Reset(); - _Private_ChannelFlowOk(active); - } - OnFlowControl(new FlowControlEventArgs(active)); - } - - public void HandleConnectionBlocked(string reason) - { - var cb = ((Connection)Session.Connection); - - cb.HandleConnectionBlocked(reason); - } - - public void HandleConnectionClose(ushort replyCode, - string replyText, - ushort classId, - ushort methodId) - { - var reason = new ShutdownEventArgs(ShutdownInitiator.Peer, - replyCode, - replyText, - classId, - methodId); - try - { - ((Connection)Session.Connection).InternalClose(reason); - _Private_ConnectionCloseOk(); - SetCloseReason((Session.Connection).CloseReason); - } - catch (IOException) - { - // Ignored. We're only trying to be polite by sending - // the close-ok, after all. - } - catch (AlreadyClosedException) - { - // Ignored. We're only trying to be polite by sending - // the close-ok, after all. - } - } - - public void HandleConnectionOpenOk(string knownHosts) - { - var k = (ConnectionOpenContinuation)m_continuationQueue.Next(); - k.m_redirect = false; - k.m_host = null; - k.m_knownHosts = knownHosts; - k.HandleCommand(null); // release the continuation. - } - - public void HandleConnectionSecure(byte[] challenge) - { - var k = (ConnectionStartRpcContinuation)m_continuationQueue.Next(); - k.m_result = new ConnectionSecureOrTune - { - m_challenge = challenge - }; - k.HandleCommand(null); // release the continuation. - } - - public void HandleConnectionStart(byte versionMajor, - byte versionMinor, - IDictionary serverProperties, - byte[] mechanisms, - byte[] locales) - { - if (m_connectionStartCell == null) - { - var reason = - new ShutdownEventArgs(ShutdownInitiator.Library, - Constants.CommandInvalid, - "Unexpected Connection.Start"); - ((Connection)Session.Connection).Close(reason); - } - var details = new ConnectionStartDetails - { - m_versionMajor = versionMajor, - m_versionMinor = versionMinor, - m_serverProperties = serverProperties, - m_mechanisms = mechanisms, - m_locales = locales - }; - m_connectionStartCell.Value = details; - m_connectionStartCell = null; - } - - ///Handle incoming Connection.Tune - ///methods. - public void HandleConnectionTune(ushort channelMax, - uint frameMax, - ushort heartbeat) - { - var k = (ConnectionStartRpcContinuation)m_continuationQueue.Next(); - k.m_result = new ConnectionSecureOrTune - { - m_tuneDetails = - { - m_channelMax = channelMax, - m_frameMax = frameMax, - m_heartbeat = heartbeat - } - }; - k.HandleCommand(null); // release the continuation. - } - - public void HandleConnectionUnblocked() - { - var cb = ((Connection)Session.Connection); - - cb.HandleConnectionUnblocked(); - } - - public void HandleQueueDeclareOk(string queue, - uint messageCount, - uint consumerCount) - { - var k = (QueueDeclareRpcContinuation)m_continuationQueue.Next(); - k.m_result = new QueueDeclareOk(queue, messageCount, consumerCount); - k.HandleCommand(null); // release the continuation. - } - - public abstract void _Private_BasicCancel(string consumerTag, - bool nowait); - - public abstract void _Private_BasicConsume(string queue, - string consumerTag, - bool noLocal, - bool autoAck, - bool exclusive, - bool nowait, - IDictionary arguments); - - public abstract void _Private_BasicGet(string queue, - bool autoAck); - - public abstract void _Private_BasicPublish(string exchange, - string routingKey, - bool mandatory, - IBasicProperties basicProperties, - byte[] body); - - public abstract void _Private_BasicRecover(bool requeue); - - public abstract void _Private_ChannelClose(ushort replyCode, - string replyText, - ushort classId, - ushort methodId); - - public abstract void _Private_ChannelCloseOk(); - - public abstract void _Private_ChannelFlowOk(bool active); - - public abstract void _Private_ChannelOpen(string outOfBand); - - public abstract void _Private_ConfirmSelect(bool nowait); - - public abstract void _Private_ConnectionClose(ushort replyCode, - string replyText, - ushort classId, - ushort methodId); - - public abstract void _Private_ConnectionCloseOk(); - - public abstract void _Private_ConnectionOpen(string virtualHost, - string capabilities, - bool insist); - - public abstract void _Private_ConnectionSecureOk(byte[] response); - - public abstract void _Private_ConnectionStartOk(IDictionary clientProperties, - string mechanism, - byte[] response, - string locale); - - public abstract void _Private_ExchangeBind(string destination, - string source, - string routingKey, - bool nowait, - IDictionary arguments); - - public abstract void _Private_ExchangeDeclare(string exchange, - string type, - bool passive, - bool durable, - bool autoDelete, - bool @internal, - bool nowait, - IDictionary arguments); - - public abstract void _Private_ExchangeDelete(string exchange, - bool ifUnused, - bool nowait); - - public abstract void _Private_ExchangeUnbind(string destination, - string source, - string routingKey, - bool nowait, - IDictionary arguments); - - public abstract void _Private_QueueBind(string queue, - string exchange, - string routingKey, - bool nowait, - IDictionary arguments); - - public abstract void _Private_QueueDeclare(string queue, - bool passive, - bool durable, - bool exclusive, - bool autoDelete, - bool nowait, - IDictionary arguments); - - public abstract uint _Private_QueueDelete(string queue, - bool ifUnused, - bool ifEmpty, - bool nowait); - - public abstract uint _Private_QueuePurge(string queue, - bool nowait); - - public void Abort() - { - Abort(Constants.ReplySuccess, "Goodbye"); - } - - public void Abort(ushort replyCode, string replyText) - { - Close(replyCode, replyText, true); - } - - public abstract void BasicAck(ulong deliveryTag, bool multiple); - - public void BasicCancel(string consumerTag) - { - var k = new BasicConsumerRpcContinuation { m_consumerTag = consumerTag }; - - Enqueue(k); - - _Private_BasicCancel(consumerTag, false); - k.GetReply(this.ContinuationTimeout); - lock (m_consumers) - { - m_consumers.Remove(consumerTag); - } - - ModelShutdown -= k.m_consumer.HandleModelShutdown; - } - - public string BasicConsume(string queue, - bool autoAck, - string consumerTag, - bool noLocal, - bool exclusive, - IDictionary arguments, - IBasicConsumer consumer) - { - // TODO: Replace with flag - var asyncDispatcher = ConsumerDispatcher as AsyncConsumerDispatcher; - if (asyncDispatcher != null) - { - var asyncConsumer = consumer as IAsyncBasicConsumer; - if (asyncConsumer == null) - { - // TODO: Friendly message - throw new InvalidOperationException("In the async mode you have to use an async consumer"); - } - } - - var k = new BasicConsumerRpcContinuation { m_consumer = consumer }; - - Enqueue(k); - // Non-nowait. We have an unconventional means of getting - // the RPC response, but a response is still expected. - _Private_BasicConsume(queue, consumerTag, noLocal, autoAck, exclusive, - /*nowait:*/ false, arguments); - k.GetReply(this.ContinuationTimeout); - string actualConsumerTag = k.m_consumerTag; - - return actualConsumerTag; - } - - public BasicGetResult BasicGet(string queue, - bool autoAck) - { - var k = new BasicGetRpcContinuation(); - Enqueue(k); - _Private_BasicGet(queue, autoAck); - k.GetReply(this.ContinuationTimeout); - return k.m_result; - } - - public abstract void BasicNack(ulong deliveryTag, - bool multiple, - bool requeue); - - public void BasicPublish(string exchange, - string routingKey, - bool mandatory, - IBasicProperties basicProperties, - byte[] body) - { - if (basicProperties == null) - { - basicProperties = CreateBasicProperties(); - } - if (NextPublishSeqNo > 0) - { - lock (m_unconfirmedSet.SyncRoot) - { - if (!m_unconfirmedSet.Contains(NextPublishSeqNo)) - { - m_unconfirmedSet.Add(NextPublishSeqNo); - } - NextPublishSeqNo++; - } - } - _Private_BasicPublish(exchange, - routingKey, - mandatory, - basicProperties, - body); - } - - public void BasicBatchPublish(string exchange, - string routingKey, bool mandatory, IEnumerable messages) - { - foreach (var message in messages) - { - if (message.basicProperties == null) - { - message.basicProperties = CreateBasicProperties(); - } - - if (NextPublishSeqNo > 0) - { - lock (m_unconfirmedSet.SyncRoot) - { - if (!m_unconfirmedSet.Contains(NextPublishSeqNo)) - { - m_unconfirmedSet.Add(NextPublishSeqNo); - } - NextPublishSeqNo++; - } - } - } - - _Private_BasicBatchPublish(exchange, - routingKey, - mandatory, - messages); - } - public void _Private_BasicBatchPublish( - string @exchange, - string @routingKey, - bool @mandatory, - IEnumerable messages) - { - BasicPublish __req = new BasicPublish(); - __req.m_exchange = @exchange; - __req.m_routingKey = @routingKey; - __req.m_mandatory = @mandatory; - //__req.m_immediate = @immediate; - ModelSend(__req, messages); - } - public abstract void BasicQos(uint prefetchSize, - ushort prefetchCount, - bool global); - - public void BasicRecover(bool requeue) - { - var k = new SimpleBlockingRpcContinuation(); - - Enqueue(k); - _Private_BasicRecover(requeue); - k.GetReply(this.ContinuationTimeout); - } - - public abstract void BasicRecoverAsync(bool requeue); - - public abstract void BasicReject(ulong deliveryTag, - bool requeue); - - public void Close() - { - Close(Constants.ReplySuccess, "Goodbye"); - } - - public void Close(ushort replyCode, string replyText) - { - Close(replyCode, replyText, false); - } - - public void ConfirmSelect() - { - if (NextPublishSeqNo == 0UL) - { - NextPublishSeqNo = 1; - } - _Private_ConfirmSelect(false); - } - - /////////////////////////////////////////////////////////////////////////// - - public abstract IBasicProperties CreateBasicProperties(); - - - public void ExchangeBind(string destination, - string source, - string routingKey, - IDictionary arguments) - { - _Private_ExchangeBind(destination, source, routingKey, false, arguments); - } - - public void ExchangeBindNoWait(string destination, - string source, - string routingKey, - IDictionary arguments) - { - _Private_ExchangeBind(destination, source, routingKey, true, arguments); - } - - public void ExchangeDeclare(string exchange, string type, bool durable, bool autoDelete, IDictionary arguments) - { - _Private_ExchangeDeclare(exchange, type, false, durable, autoDelete, false, false, arguments); - } - - public void ExchangeDeclareNoWait(string exchange, - string type, - bool durable, - bool autoDelete, - IDictionary arguments) - { - _Private_ExchangeDeclare(exchange, type, false, durable, autoDelete, false, true, arguments); - } - - public void ExchangeDeclarePassive(string exchange) - { - _Private_ExchangeDeclare(exchange, "", true, false, false, false, false, null); - } - - public void ExchangeDelete(string exchange, - bool ifUnused) - { - _Private_ExchangeDelete(exchange, ifUnused, false); - } - - public void ExchangeDeleteNoWait(string exchange, - bool ifUnused) - { - _Private_ExchangeDelete(exchange, ifUnused, false); - } - - public void ExchangeUnbind(string destination, - string source, - string routingKey, - IDictionary arguments) - { - _Private_ExchangeUnbind(destination, source, routingKey, false, arguments); - } - - public void ExchangeUnbindNoWait(string destination, - string source, - string routingKey, - IDictionary arguments) - { - _Private_ExchangeUnbind(destination, source, routingKey, true, arguments); - } - - public void QueueBind(string queue, - string exchange, - string routingKey, - IDictionary arguments) - { - _Private_QueueBind(queue, exchange, routingKey, false, arguments); - } - - public void QueueBindNoWait(string queue, - string exchange, - string routingKey, - IDictionary arguments) - { - _Private_QueueBind(queue, exchange, routingKey, true, arguments); - } - - public QueueDeclareOk QueueDeclare(string queue, bool durable, - bool exclusive, bool autoDelete, - IDictionary arguments) - { - return QueueDeclare(queue, false, durable, exclusive, autoDelete, arguments); - } - - public void QueueDeclareNoWait(string queue, bool durable, bool exclusive, - bool autoDelete, IDictionary arguments) - { - _Private_QueueDeclare(queue, false, durable, exclusive, autoDelete, true, arguments); - } - - public QueueDeclareOk QueueDeclarePassive(string queue) - { - return QueueDeclare(queue, true, false, false, false, null); - } - - public uint MessageCount(string queue) - { - var ok = QueueDeclarePassive(queue); - return ok.MessageCount; - } - - public uint ConsumerCount(string queue) - { - var ok = QueueDeclarePassive(queue); - return ok.ConsumerCount; - } - - public uint QueueDelete(string queue, - bool ifUnused, - bool ifEmpty) - { - return _Private_QueueDelete(queue, ifUnused, ifEmpty, false); - } - - public void QueueDeleteNoWait(string queue, - bool ifUnused, - bool ifEmpty) - { - _Private_QueueDelete(queue, ifUnused, ifEmpty, true); - } - - public uint QueuePurge(string queue) - { - return _Private_QueuePurge(queue, false); - } - - public abstract void QueueUnbind(string queue, - string exchange, - string routingKey, - IDictionary arguments); - - public abstract void TxCommit(); - - public abstract void TxRollback(); - - public abstract void TxSelect(); - - public bool WaitForConfirms(TimeSpan timeout, out bool timedOut) - { - if (NextPublishSeqNo == 0UL) - { - throw new InvalidOperationException("Confirms not selected"); - } - bool isWaitInfinite = (timeout.TotalMilliseconds == Timeout.Infinite); - Stopwatch stopwatch = Stopwatch.StartNew(); - lock (m_unconfirmedSet.SyncRoot) - { - while (true) - { - if (!IsOpen) - { - throw new AlreadyClosedException(CloseReason); - } - - if (m_unconfirmedSet.Count == 0) - { - bool aux = m_onlyAcksReceived; - m_onlyAcksReceived = true; - timedOut = false; - return aux; - } - if (isWaitInfinite) - { - Monitor.Wait(m_unconfirmedSet.SyncRoot); - } - else - { - TimeSpan elapsed = stopwatch.Elapsed; - if (elapsed > timeout || !Monitor.Wait( - m_unconfirmedSet.SyncRoot, timeout - elapsed)) - { - timedOut = true; - return true; - } - } - } - } - } - - public bool WaitForConfirms() - { - bool timedOut; - return WaitForConfirms(TimeSpan.FromMilliseconds(Timeout.Infinite), out timedOut); - } - - public bool WaitForConfirms(TimeSpan timeout) - { - bool timedOut; - return WaitForConfirms(timeout, out timedOut); - } - - public void WaitForConfirmsOrDie() - { - WaitForConfirmsOrDie(TimeSpan.FromMilliseconds(Timeout.Infinite)); - } - - public void WaitForConfirmsOrDie(TimeSpan timeout) - { - bool timedOut; - bool onlyAcksReceived = WaitForConfirms(timeout, out timedOut); - if (!onlyAcksReceived) - { - Close(new ShutdownEventArgs(ShutdownInitiator.Application, - Constants.ReplySuccess, - "Nacks Received", new IOException("nack received")), - false); - throw new IOException("Nacks Received"); - } - if (timedOut) - { - Close(new ShutdownEventArgs(ShutdownInitiator.Application, - Constants.ReplySuccess, - "Timed out waiting for acks", - new IOException("timed out waiting for acks")), - false); - throw new IOException("Timed out waiting for acks"); - } - } - - protected virtual void handleAckNack(ulong deliveryTag, bool multiple, bool isNack) - { - lock (m_unconfirmedSet.SyncRoot) - { - if (multiple) - { - for (ulong i = m_unconfirmedSet[0]; i <= deliveryTag; i++) - { - // removes potential duplicates - while (m_unconfirmedSet.Remove(i)) - { - } - } - } - else - { - while (m_unconfirmedSet.Remove(deliveryTag)) - { - } - } - m_onlyAcksReceived = m_onlyAcksReceived && !isNack; - if (m_unconfirmedSet.Count == 0) - { - Monitor.Pulse(m_unconfirmedSet.SyncRoot); - } - } - } - - private QueueDeclareOk QueueDeclare(string queue, bool passive, bool durable, bool exclusive, - bool autoDelete, IDictionary arguments) - { - var k = new QueueDeclareRpcContinuation(); - Enqueue(k); - _Private_QueueDeclare(queue, passive, durable, exclusive, autoDelete, false, arguments); - k.GetReply(this.ContinuationTimeout); - return k.m_result; - } - - public class BasicConsumerRpcContinuation : SimpleBlockingRpcContinuation - { - public IBasicConsumer m_consumer; - public string m_consumerTag; - } - - public class BasicGetRpcContinuation : SimpleBlockingRpcContinuation - { - public BasicGetResult m_result; - } - - public class ConnectionOpenContinuation : SimpleBlockingRpcContinuation - { - public string m_host; - public string m_knownHosts; - public bool m_redirect; - } - - public class ConnectionStartRpcContinuation : SimpleBlockingRpcContinuation - { - public ConnectionSecureOrTune m_result; - } - - public class QueueDeclareRpcContinuation : SimpleBlockingRpcContinuation - { - public QueueDeclareOk m_result; - } - } -} +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 1.1. +// +// The APL v2.0: +// +//--------------------------------------------------------------------------- +// Copyright (c) 2007-2016 Pivotal Software, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//--------------------------------------------------------------------------- +// +// The MPL v1.1: +// +//--------------------------------------------------------------------------- +// The contents of this file are subject to the Mozilla Public License +// Version 1.1 (the "License"); you may not use this file except in +// compliance with the License. You may obtain a copy of the License +// at http://www.mozilla.org/MPL/ +// +// Software distributed under the License is distributed on an "AS IS" +// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +// the License for the specific language governing rights and +// limitations under the License. +// +// The Original Code is RabbitMQ. +// +// The Initial Developer of the Original Code is Pivotal Software, Inc. +// Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. +//--------------------------------------------------------------------------- + +using RabbitMQ.Client.Events; +using RabbitMQ.Client.Exceptions; +using RabbitMQ.Client.Framing; +using RabbitMQ.Client.Framing.Impl; +using RabbitMQ.Util; +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.Threading; + +#if (NETFX_CORE) +using Trace = System.Diagnostics.Debug; +#endif + +namespace RabbitMQ.Client.Impl +{ + public abstract class ModelBase : IFullModel, IRecoverable + { + public readonly IDictionary m_consumers = new Dictionary(); + + ///Only used to kick-start a connection open + ///sequence. See + public BlockingCell m_connectionStartCell = null; + + private TimeSpan m_handshakeContinuationTimeout = TimeSpan.FromSeconds(10); + private TimeSpan m_continuationTimeout = TimeSpan.FromSeconds(20); + + private RpcContinuationQueue m_continuationQueue = new RpcContinuationQueue(); + private ManualResetEvent m_flowControlBlock = new ManualResetEvent(true); + + private readonly object m_eventLock = new object(); + private readonly object m_flowSendLock = new object(); + private readonly object m_shutdownLock = new object(); + + private readonly SynchronizedList m_unconfirmedSet = new SynchronizedList(); + + private EventHandler m_basicAck; + private EventHandler m_basicNack; + private EventHandler m_basicRecoverOk; + private EventHandler m_basicReturn; + private EventHandler m_callbackException; + private EventHandler m_flowControl; + private EventHandler m_modelShutdown; + + private bool m_onlyAcksReceived = true; + + private EventHandler m_recovery; + + public IConsumerDispatcher ConsumerDispatcher { get; private set; } + + public ModelBase(ISession session) + : this(session, session.Connection.ConsumerWorkService) + { } + + public ModelBase(ISession session, ConsumerWorkService workService) + { + var asyncConsumerWorkService = workService as AsyncConsumerWorkService; + if (asyncConsumerWorkService != null) + { + ConsumerDispatcher = new AsyncConsumerDispatcher(this, asyncConsumerWorkService); + } + else + { + ConsumerDispatcher = new ConcurrentConsumerDispatcher(this, workService); + } + + Initialise(session); + } + + protected void Initialise(ISession session) + { + CloseReason = null; + NextPublishSeqNo = 0; + Session = session; + Session.CommandReceived = HandleCommand; + Session.SessionShutdown += OnSessionShutdown; + } + + public TimeSpan HandshakeContinuationTimeout + { + get { return m_handshakeContinuationTimeout; } + set { m_handshakeContinuationTimeout = value; } + } + + public TimeSpan ContinuationTimeout + { + get { return m_continuationTimeout; } + set { m_continuationTimeout = value; } + } + + public event EventHandler BasicAcks + { + add + { + lock (m_eventLock) + { + m_basicAck += value; + } + } + remove + { + lock (m_eventLock) + { + m_basicAck -= value; + } + } + } + + public event EventHandler BasicNacks + { + add + { + lock (m_eventLock) + { + m_basicNack += value; + } + } + remove + { + lock (m_eventLock) + { + m_basicNack -= value; + } + } + } + + public event EventHandler BasicRecoverOk + { + add + { + lock (m_eventLock) + { + m_basicRecoverOk += value; + } + } + remove + { + lock (m_eventLock) + { + m_basicRecoverOk -= value; + } + } + } + + public event EventHandler BasicReturn + { + add + { + lock (m_eventLock) + { + m_basicReturn += value; + } + } + remove + { + lock (m_eventLock) + { + m_basicReturn -= value; + } + } + } + + public event EventHandler CallbackException + { + add + { + lock (m_eventLock) + { + m_callbackException += value; + } + } + remove + { + lock (m_eventLock) + { + m_callbackException -= value; + } + } + } + + public event EventHandler FlowControl + { + add + { + lock (m_eventLock) + { + m_flowControl += value; + } + } + remove + { + lock (m_eventLock) + { + m_flowControl -= value; + } + } + } + + public event EventHandler ModelShutdown + { + add + { + bool ok = false; + if (CloseReason == null) + { + lock (m_shutdownLock) + { + if (CloseReason == null) + { + m_modelShutdown += value; + ok = true; + } + } + } + if (!ok) + { + value(this, CloseReason); + } + } + remove + { + lock (m_shutdownLock) + { + m_modelShutdown -= value; + } + } + } + + public event EventHandler Recovery + { + add + { + lock (m_eventLock) + { + m_recovery += value; + } + } + remove + { + lock (m_eventLock) + { + m_recovery -= value; + } + } + } + + public int ChannelNumber + { + get { return ((Session)Session).ChannelNumber; } + } + + public ShutdownEventArgs CloseReason { get; private set; } + + public IBasicConsumer DefaultConsumer { get; set; } + + public bool IsClosed + { + get { return !IsOpen; } + } + + public bool IsOpen + { + get { return CloseReason == null; } + } + + public ulong NextPublishSeqNo { get; private set; } + + public ISession Session { get; private set; } + + public void Close(ushort replyCode, string replyText, bool abort) + { + Close(new ShutdownEventArgs(ShutdownInitiator.Application, + replyCode, replyText), + abort); + } + + public void Close(ShutdownEventArgs reason, bool abort) + { + var k = new ShutdownContinuation(); + ModelShutdown += k.OnConnectionShutdown; + + try + { + ConsumerDispatcher.Quiesce(); + if (SetCloseReason(reason)) + { + _Private_ChannelClose(reason.ReplyCode, reason.ReplyText, 0, 0); + } + k.Wait(TimeSpan.FromMilliseconds(10000)); + ConsumerDispatcher.Shutdown(this); + } + catch (AlreadyClosedException) + { + if (!abort) + { + throw; + } + } + catch (IOException) + { + if (!abort) + { + throw; + } + } + catch (Exception) + { + if (!abort) + { + throw; + } + } + } + + public string ConnectionOpen(string virtualHost, + string capabilities, + bool insist) + { + var k = new ConnectionOpenContinuation(); + Enqueue(k); + try + { + _Private_ConnectionOpen(virtualHost, capabilities, insist); + } + catch (AlreadyClosedException) + { + // let continuation throw OperationInterruptedException, + // which is a much more suitable exception before connection + // negotiation finishes + } + k.GetReply(HandshakeContinuationTimeout); + return k.m_knownHosts; + } + + public ConnectionSecureOrTune ConnectionSecureOk(byte[] response) + { + var k = new ConnectionStartRpcContinuation(); + Enqueue(k); + try + { + _Private_ConnectionSecureOk(response); + } + catch (AlreadyClosedException) + { + // let continuation throw OperationInterruptedException, + // which is a much more suitable exception before connection + // negotiation finishes + } + k.GetReply(HandshakeContinuationTimeout); + return k.m_result; + } + + public ConnectionSecureOrTune ConnectionStartOk(IDictionary clientProperties, + string mechanism, + byte[] response, + string locale) + { + var k = new ConnectionStartRpcContinuation(); + Enqueue(k); + try + { + _Private_ConnectionStartOk(clientProperties, mechanism, + response, locale); + } + catch (AlreadyClosedException) + { + // let continuation throw OperationInterruptedException, + // which is a much more suitable exception before connection + // negotiation finishes + } + k.GetReply(HandshakeContinuationTimeout); + return k.m_result; + } + + public abstract bool DispatchAsynchronous(Command cmd); + + public void Enqueue(IRpcContinuation k) + { + bool ok = false; + if (CloseReason == null) + { + lock (m_shutdownLock) + { + if (CloseReason == null) + { + m_continuationQueue.Enqueue(k); + ok = true; + } + } + } + if (!ok) + { + k.HandleModelShutdown(CloseReason); + } + } + + public void FinishClose() + { + if (CloseReason != null) + { + Session.Close(CloseReason); + } + if (m_connectionStartCell != null) + { + m_connectionStartCell.Value = null; + } + } + + public void HandleCommand(ISession session, Command cmd) + { + if (!DispatchAsynchronous(cmd))// Was asynchronous. Already processed. No need to process further. + m_continuationQueue.Next().HandleCommand(cmd); + } + + public MethodBase ModelRpc(MethodBase method, ContentHeaderBase header, byte[] body) + { + var k = new SimpleBlockingRpcContinuation(); + TransmitAndEnqueue(new Command(method, header, body), k); + return k.GetReply(this.ContinuationTimeout).Method; + } + + public void ModelSend(MethodBase method, ContentHeaderBase header, byte[] body) + { + if (method.HasContent) + { + m_flowControlBlock.WaitOne(); + Session.Transmit(new Command(method, header, body)); + } + else + { + Session.Transmit(new Command(method, header, body)); + } + } + public void ModelSend(MethodBase method, IEnumerable messages) + { + if (method.HasContent) + { + m_flowControlBlock.WaitOne(); + } + List commands = new List(); + foreach (var message in messages) + { + commands.Add(new Command(method, (ContentHeaderBase)message.basicProperties, message.Body)); + } + Session.Transmit(commands); + } + + public virtual void OnBasicAck(BasicAckEventArgs args) + { + EventHandler handler; + lock (m_eventLock) + { + handler = m_basicAck; + } + if (handler != null) + { + foreach (EventHandler h in handler.GetInvocationList()) + { + try + { + h(this, args); + } + catch (Exception e) + { + OnCallbackException(CallbackExceptionEventArgs.Build(e, "OnBasicAck")); + } + } + } + + handleAckNack(args.DeliveryTag, args.Multiple, false); + } + + public virtual void OnBasicNack(BasicNackEventArgs args) + { + EventHandler handler; + lock (m_eventLock) + { + handler = m_basicNack; + } + if (handler != null) + { + foreach (EventHandler h in handler.GetInvocationList()) + { + try + { + h(this, args); + } + catch (Exception e) + { + OnCallbackException(CallbackExceptionEventArgs.Build(e, "OnBasicNack")); + } + } + } + + handleAckNack(args.DeliveryTag, args.Multiple, true); + } + + public virtual void OnBasicRecoverOk(EventArgs args) + { + EventHandler handler; + lock (m_eventLock) + { + handler = m_basicRecoverOk; + } + if (handler != null) + { + foreach (EventHandler h in handler.GetInvocationList()) + { + try + { + h(this, args); + } + catch (Exception e) + { + OnCallbackException(CallbackExceptionEventArgs.Build(e, "OnBasicRecover")); + } + } + } + } + + public virtual void OnBasicReturn(BasicReturnEventArgs args) + { + EventHandler handler; + lock (m_eventLock) + { + handler = m_basicReturn; + } + if (handler != null) + { + foreach (EventHandler h in handler.GetInvocationList()) + { + try + { + h(this, args); + } + catch (Exception e) + { + OnCallbackException(CallbackExceptionEventArgs.Build(e, "OnBasicReturn")); + } + } + } + } + + public virtual void OnCallbackException(CallbackExceptionEventArgs args) + { + EventHandler handler; + lock (m_eventLock) + { + handler = m_callbackException; + } + if (handler != null) + { + foreach (EventHandler h in handler.GetInvocationList()) + { + try + { + h(this, args); + } + catch + { + // Exception in + // Callback-exception-handler. That was the + // app's last chance. Swallow the exception. + // FIXME: proper logging + } + } + } + } + + public virtual void OnFlowControl(FlowControlEventArgs args) + { + EventHandler handler; + lock (m_eventLock) + { + handler = m_flowControl; + } + if (handler != null) + { + foreach (EventHandler h in handler.GetInvocationList()) + { + try + { + h(this, args); + } + catch (Exception e) + { + OnCallbackException(CallbackExceptionEventArgs.Build(e, "OnFlowControl")); + } + } + } + } + + ///Broadcasts notification of the final shutdown of the model. + /// + /// + ///Do not call anywhere other than at the end of OnSessionShutdown. + /// + /// + ///Must not be called when m_closeReason == null, because + ///otherwise there's a window when a new continuation could be + ///being enqueued at the same time as we're broadcasting the + ///shutdown event. See the definition of Enqueue() above. + /// + /// + public virtual void OnModelShutdown(ShutdownEventArgs reason) + { + m_continuationQueue.HandleModelShutdown(reason); + EventHandler handler; + lock (m_shutdownLock) + { + handler = m_modelShutdown; + m_modelShutdown = null; + } + if (handler != null) + { + foreach (EventHandler h in handler.GetInvocationList()) + { + try + { + h(this, reason); + } + catch (Exception e) + { + OnCallbackException(CallbackExceptionEventArgs.Build(e, "OnModelShutdown")); + } + } + } + lock (m_unconfirmedSet.SyncRoot) + Monitor.Pulse(m_unconfirmedSet.SyncRoot); + m_flowControlBlock.Set(); + } + + public void OnSessionShutdown(object sender, ShutdownEventArgs reason) + { + this.ConsumerDispatcher.Quiesce(); + SetCloseReason(reason); + OnModelShutdown(reason); + BroadcastShutdownToConsumers(m_consumers, reason); + this.ConsumerDispatcher.Shutdown(this); + } + + protected void BroadcastShutdownToConsumers(IDictionary cs, ShutdownEventArgs reason) + { + foreach (var c in cs) + { + this.ConsumerDispatcher.HandleModelShutdown(c.Value, reason); + } + } + + public bool SetCloseReason(ShutdownEventArgs reason) + { + if (CloseReason == null) + { + lock (m_shutdownLock) + { + if (CloseReason == null) + { + CloseReason = reason; + return true; + } + else + { + return false; + } + } + } + else + return false; + } + + public override string ToString() + { + return Session.ToString(); + } + + public void TransmitAndEnqueue(Command cmd, IRpcContinuation k) + { + Enqueue(k); + Session.Transmit(cmd); + } + + void IDisposable.Dispose() + { + Abort(); + } + + public abstract void ConnectionTuneOk(ushort channelMax, + uint frameMax, + ushort heartbeat); + + public void HandleBasicAck(ulong deliveryTag, + bool multiple) + { + var e = new BasicAckEventArgs + { + DeliveryTag = deliveryTag, + Multiple = multiple + }; + OnBasicAck(e); + } + + public void HandleBasicCancel(string consumerTag, bool nowait) + { + IBasicConsumer consumer; + lock (m_consumers) + { + consumer = m_consumers[consumerTag]; + m_consumers.Remove(consumerTag); + } + if (consumer == null) + { + consumer = DefaultConsumer; + } + ConsumerDispatcher.HandleBasicCancel(consumer, consumerTag); + } + + public void HandleBasicCancelOk(string consumerTag) + { + var k = + (BasicConsumerRpcContinuation)m_continuationQueue.Next(); +/* + Trace.Assert(k.m_consumerTag == consumerTag, string.Format( + "Consumer tag mismatch during cancel: {0} != {1}", + k.m_consumerTag, + consumerTag + )); +*/ + lock (m_consumers) + { + k.m_consumer = m_consumers[consumerTag]; + m_consumers.Remove(consumerTag); + } + ConsumerDispatcher.HandleBasicCancelOk(k.m_consumer, consumerTag); + k.HandleCommand(null); // release the continuation. + } + + public void HandleBasicConsumeOk(string consumerTag) + { + var k = + (BasicConsumerRpcContinuation)m_continuationQueue.Next(); + k.m_consumerTag = consumerTag; + lock (m_consumers) + { + m_consumers[consumerTag] = k.m_consumer; + } + ConsumerDispatcher.HandleBasicConsumeOk(k.m_consumer, consumerTag); + k.HandleCommand(null); // release the continuation. + } + + public virtual void HandleBasicDeliver(string consumerTag, + ulong deliveryTag, + bool redelivered, + string exchange, + string routingKey, + IBasicProperties basicProperties, + byte[] body) + { + IBasicConsumer consumer; + lock (m_consumers) + { + consumer = m_consumers[consumerTag]; + } + if (consumer == null) + { + if (DefaultConsumer == null) + { + throw new InvalidOperationException("Unsolicited delivery -" + + " see IModel.DefaultConsumer to handle this" + + " case."); + } + else + { + consumer = DefaultConsumer; + } + } + + ConsumerDispatcher.HandleBasicDeliver(consumer, + consumerTag, + deliveryTag, + redelivered, + exchange, + routingKey, + basicProperties, + body); + } + + public void HandleBasicGetEmpty() + { + var k = (BasicGetRpcContinuation)m_continuationQueue.Next(); + k.m_result = null; + k.HandleCommand(null); // release the continuation. + } + + public virtual void HandleBasicGetOk(ulong deliveryTag, + bool redelivered, + string exchange, + string routingKey, + uint messageCount, + IBasicProperties basicProperties, + byte[] body) + { + var k = (BasicGetRpcContinuation)m_continuationQueue.Next(); + k.m_result = new BasicGetResult(deliveryTag, + redelivered, + exchange, + routingKey, + messageCount, + basicProperties, + body); + k.HandleCommand(null); // release the continuation. + } + + public void HandleBasicNack(ulong deliveryTag, + bool multiple, + bool requeue) + { + var e = new BasicNackEventArgs(); + e.DeliveryTag = deliveryTag; + e.Multiple = multiple; + e.Requeue = requeue; + OnBasicNack(e); + } + + public void HandleBasicRecoverOk() + { + var k = (SimpleBlockingRpcContinuation)m_continuationQueue.Next(); + OnBasicRecoverOk(new EventArgs()); + k.HandleCommand(null); + } + + public void HandleBasicReturn(ushort replyCode, + string replyText, + string exchange, + string routingKey, + IBasicProperties basicProperties, + byte[] body) + { + var e = new BasicReturnEventArgs(); + e.ReplyCode = replyCode; + e.ReplyText = replyText; + e.Exchange = exchange; + e.RoutingKey = routingKey; + e.BasicProperties = basicProperties; + e.Body = body; + OnBasicReturn(e); + } + + public void HandleChannelClose(ushort replyCode, + string replyText, + ushort classId, + ushort methodId) + { + SetCloseReason(new ShutdownEventArgs(ShutdownInitiator.Peer, + replyCode, + replyText, + classId, + methodId)); + + Session.Close(CloseReason, false); + try + { + _Private_ChannelCloseOk(); + } + finally + { + Session.Notify(); + } + } + + public void HandleChannelCloseOk() + { + FinishClose(); + } + + public void HandleChannelFlow(bool active) + { + if (active) + { + m_flowControlBlock.Set(); + _Private_ChannelFlowOk(active); + } + else + { + m_flowControlBlock.Reset(); + _Private_ChannelFlowOk(active); + } + OnFlowControl(new FlowControlEventArgs(active)); + } + + public void HandleConnectionBlocked(string reason) + { + var cb = ((Connection)Session.Connection); + + cb.HandleConnectionBlocked(reason); + } + + public void HandleConnectionClose(ushort replyCode, + string replyText, + ushort classId, + ushort methodId) + { + var reason = new ShutdownEventArgs(ShutdownInitiator.Peer, + replyCode, + replyText, + classId, + methodId); + try + { + ((Connection)Session.Connection).InternalClose(reason); + _Private_ConnectionCloseOk(); + SetCloseReason((Session.Connection).CloseReason); + } + catch (IOException) + { + // Ignored. We're only trying to be polite by sending + // the close-ok, after all. + } + catch (AlreadyClosedException) + { + // Ignored. We're only trying to be polite by sending + // the close-ok, after all. + } + } + + public void HandleConnectionOpenOk(string knownHosts) + { + var k = (ConnectionOpenContinuation)m_continuationQueue.Next(); + k.m_redirect = false; + k.m_host = null; + k.m_knownHosts = knownHosts; + k.HandleCommand(null); // release the continuation. + } + + public void HandleConnectionSecure(byte[] challenge) + { + var k = (ConnectionStartRpcContinuation)m_continuationQueue.Next(); + k.m_result = new ConnectionSecureOrTune + { + m_challenge = challenge + }; + k.HandleCommand(null); // release the continuation. + } + + public void HandleConnectionStart(byte versionMajor, + byte versionMinor, + IDictionary serverProperties, + byte[] mechanisms, + byte[] locales) + { + if (m_connectionStartCell == null) + { + var reason = + new ShutdownEventArgs(ShutdownInitiator.Library, + Constants.CommandInvalid, + "Unexpected Connection.Start"); + ((Connection)Session.Connection).Close(reason); + } + var details = new ConnectionStartDetails + { + m_versionMajor = versionMajor, + m_versionMinor = versionMinor, + m_serverProperties = serverProperties, + m_mechanisms = mechanisms, + m_locales = locales + }; + m_connectionStartCell.Value = details; + m_connectionStartCell = null; + } + + ///Handle incoming Connection.Tune + ///methods. + public void HandleConnectionTune(ushort channelMax, + uint frameMax, + ushort heartbeat) + { + var k = (ConnectionStartRpcContinuation)m_continuationQueue.Next(); + k.m_result = new ConnectionSecureOrTune + { + m_tuneDetails = + { + m_channelMax = channelMax, + m_frameMax = frameMax, + m_heartbeat = heartbeat + } + }; + k.HandleCommand(null); // release the continuation. + } + + public void HandleConnectionUnblocked() + { + var cb = ((Connection)Session.Connection); + + cb.HandleConnectionUnblocked(); + } + + public void HandleQueueDeclareOk(string queue, + uint messageCount, + uint consumerCount) + { + var k = (QueueDeclareRpcContinuation)m_continuationQueue.Next(); + k.m_result = new QueueDeclareOk(queue, messageCount, consumerCount); + k.HandleCommand(null); // release the continuation. + } + + public abstract void _Private_BasicCancel(string consumerTag, + bool nowait); + + public abstract void _Private_BasicConsume(string queue, + string consumerTag, + bool noLocal, + bool autoAck, + bool exclusive, + bool nowait, + IDictionary arguments); + + public abstract void _Private_BasicGet(string queue, + bool autoAck); + + public abstract void _Private_BasicPublish(string exchange, + string routingKey, + bool mandatory, + IBasicProperties basicProperties, + byte[] body); + + //public abstract void _Private_BasicBatchPublish(string exchange, + // string routingKey, + // bool mandatory, + // IEnumerable messages); + + + public abstract void _Private_BasicRecover(bool requeue); + + public abstract void _Private_ChannelClose(ushort replyCode, + string replyText, + ushort classId, + ushort methodId); + + public abstract void _Private_ChannelCloseOk(); + + public abstract void _Private_ChannelFlowOk(bool active); + + public abstract void _Private_ChannelOpen(string outOfBand); + + public abstract void _Private_ConfirmSelect(bool nowait); + + public abstract void _Private_ConnectionClose(ushort replyCode, + string replyText, + ushort classId, + ushort methodId); + + public abstract void _Private_ConnectionCloseOk(); + + public abstract void _Private_ConnectionOpen(string virtualHost, + string capabilities, + bool insist); + + public abstract void _Private_ConnectionSecureOk(byte[] response); + + public abstract void _Private_ConnectionStartOk(IDictionary clientProperties, + string mechanism, + byte[] response, + string locale); + + public abstract void _Private_ExchangeBind(string destination, + string source, + string routingKey, + bool nowait, + IDictionary arguments); + + public abstract void _Private_ExchangeDeclare(string exchange, + string type, + bool passive, + bool durable, + bool autoDelete, + bool @internal, + bool nowait, + IDictionary arguments); + + public abstract void _Private_ExchangeDelete(string exchange, + bool ifUnused, + bool nowait); + + public abstract void _Private_ExchangeUnbind(string destination, + string source, + string routingKey, + bool nowait, + IDictionary arguments); + + public abstract void _Private_QueueBind(string queue, + string exchange, + string routingKey, + bool nowait, + IDictionary arguments); + + public abstract void _Private_QueueDeclare(string queue, + bool passive, + bool durable, + bool exclusive, + bool autoDelete, + bool nowait, + IDictionary arguments); + + public abstract uint _Private_QueueDelete(string queue, + bool ifUnused, + bool ifEmpty, + bool nowait); + + public abstract uint _Private_QueuePurge(string queue, + bool nowait); + + public void Abort() + { + Abort(Constants.ReplySuccess, "Goodbye"); + } + + public void Abort(ushort replyCode, string replyText) + { + Close(replyCode, replyText, true); + } + + public abstract void BasicAck(ulong deliveryTag, bool multiple); + + public void BasicCancel(string consumerTag) + { + var k = new BasicConsumerRpcContinuation { m_consumerTag = consumerTag }; + + Enqueue(k); + + _Private_BasicCancel(consumerTag, false); + k.GetReply(this.ContinuationTimeout); + lock (m_consumers) + { + m_consumers.Remove(consumerTag); + } + + ModelShutdown -= k.m_consumer.HandleModelShutdown; + } + + public string BasicConsume(string queue, + bool autoAck, + string consumerTag, + bool noLocal, + bool exclusive, + IDictionary arguments, + IBasicConsumer consumer) + { + // TODO: Replace with flag + var asyncDispatcher = ConsumerDispatcher as AsyncConsumerDispatcher; + if (asyncDispatcher != null) + { + var asyncConsumer = consumer as IAsyncBasicConsumer; + if (asyncConsumer == null) + { + // TODO: Friendly message + throw new InvalidOperationException("In the async mode you have to use an async consumer"); + } + } + + var k = new BasicConsumerRpcContinuation { m_consumer = consumer }; + + Enqueue(k); + // Non-nowait. We have an unconventional means of getting + // the RPC response, but a response is still expected. + _Private_BasicConsume(queue, consumerTag, noLocal, autoAck, exclusive, + /*nowait:*/ false, arguments); + k.GetReply(this.ContinuationTimeout); + string actualConsumerTag = k.m_consumerTag; + + return actualConsumerTag; + } + + public BasicGetResult BasicGet(string queue, + bool autoAck) + { + var k = new BasicGetRpcContinuation(); + Enqueue(k); + _Private_BasicGet(queue, autoAck); + k.GetReply(this.ContinuationTimeout); + return k.m_result; + } + + public abstract void BasicNack(ulong deliveryTag, + bool multiple, + bool requeue); + + public void BasicPublish(string exchange, + string routingKey, + bool mandatory, + IBasicProperties basicProperties, + byte[] body) + { + if (basicProperties == null) + { + basicProperties = CreateBasicProperties(); + } + if (NextPublishSeqNo > 0) + { + lock (m_unconfirmedSet.SyncRoot) + { + if (!m_unconfirmedSet.Contains(NextPublishSeqNo)) + { + m_unconfirmedSet.Add(NextPublishSeqNo); + } + NextPublishSeqNo++; + } + } + _Private_BasicPublish(exchange, + routingKey, + mandatory, + basicProperties, + body); + } + + public void BasicBatchPublish(string exchange, + string routingKey, + bool mandatory, + IEnumerable messages) + { + foreach (var message in messages) + { + if (message.basicProperties == null) + { + message.basicProperties = CreateBasicProperties(); + } + + if (NextPublishSeqNo > 0) + { + lock (m_unconfirmedSet.SyncRoot) + { + if (!m_unconfirmedSet.Contains(NextPublishSeqNo)) + { + m_unconfirmedSet.Add(NextPublishSeqNo); + } + NextPublishSeqNo++; + } + } + } + + _Private_BasicBatchPublish(exchange, + routingKey, + mandatory, + messages); + } + public void _Private_BasicBatchPublish( +string @exchange, +string @routingKey, +bool @mandatory, +//bool @immediate, +IEnumerable messages) + { + BasicPublish __req = new BasicPublish(); + __req.m_exchange = @exchange; + __req.m_routingKey = @routingKey; + __req.m_mandatory = @mandatory; + //__req.m_immediate = @immediate; + ModelSend(__req, messages); + } + public abstract void BasicQos(uint prefetchSize, + ushort prefetchCount, + bool global); + + public void BasicRecover(bool requeue) + { + var k = new SimpleBlockingRpcContinuation(); + + Enqueue(k); + _Private_BasicRecover(requeue); + k.GetReply(this.ContinuationTimeout); + } + + public abstract void BasicRecoverAsync(bool requeue); + + public abstract void BasicReject(ulong deliveryTag, + bool requeue); + + public void Close() + { + Close(Constants.ReplySuccess, "Goodbye"); + } + + public void Close(ushort replyCode, string replyText) + { + Close(replyCode, replyText, false); + } + + public void ConfirmSelect() + { + if (NextPublishSeqNo == 0UL) + { + NextPublishSeqNo = 1; + } + _Private_ConfirmSelect(false); + } + + /////////////////////////////////////////////////////////////////////////// + + public abstract IBasicProperties CreateBasicProperties(); + + + public void ExchangeBind(string destination, + string source, + string routingKey, + IDictionary arguments) + { + _Private_ExchangeBind(destination, source, routingKey, false, arguments); + } + + public void ExchangeBindNoWait(string destination, + string source, + string routingKey, + IDictionary arguments) + { + _Private_ExchangeBind(destination, source, routingKey, true, arguments); + } + + public void ExchangeDeclare(string exchange, string type, bool durable, bool autoDelete, IDictionary arguments) + { + _Private_ExchangeDeclare(exchange, type, false, durable, autoDelete, false, false, arguments); + } + + public void ExchangeDeclareNoWait(string exchange, + string type, + bool durable, + bool autoDelete, + IDictionary arguments) + { + _Private_ExchangeDeclare(exchange, type, false, durable, autoDelete, false, true, arguments); + } + + public void ExchangeDeclarePassive(string exchange) + { + _Private_ExchangeDeclare(exchange, "", true, false, false, false, false, null); + } + + public void ExchangeDelete(string exchange, + bool ifUnused) + { + _Private_ExchangeDelete(exchange, ifUnused, false); + } + + public void ExchangeDeleteNoWait(string exchange, + bool ifUnused) + { + _Private_ExchangeDelete(exchange, ifUnused, false); + } + + public void ExchangeUnbind(string destination, + string source, + string routingKey, + IDictionary arguments) + { + _Private_ExchangeUnbind(destination, source, routingKey, false, arguments); + } + + public void ExchangeUnbindNoWait(string destination, + string source, + string routingKey, + IDictionary arguments) + { + _Private_ExchangeUnbind(destination, source, routingKey, true, arguments); + } + + public void QueueBind(string queue, + string exchange, + string routingKey, + IDictionary arguments) + { + _Private_QueueBind(queue, exchange, routingKey, false, arguments); + } + + public void QueueBindNoWait(string queue, + string exchange, + string routingKey, + IDictionary arguments) + { + _Private_QueueBind(queue, exchange, routingKey, true, arguments); + } + + public QueueDeclareOk QueueDeclare(string queue, bool durable, + bool exclusive, bool autoDelete, + IDictionary arguments) + { + return QueueDeclare(queue, false, durable, exclusive, autoDelete, arguments); + } + + public void QueueDeclareNoWait(string queue, bool durable, bool exclusive, + bool autoDelete, IDictionary arguments) + { + _Private_QueueDeclare(queue, false, durable, exclusive, autoDelete, true, arguments); + } + + public QueueDeclareOk QueueDeclarePassive(string queue) + { + return QueueDeclare(queue, true, false, false, false, null); + } + + public uint MessageCount(string queue) + { + var ok = QueueDeclarePassive(queue); + return ok.MessageCount; + } + + public uint ConsumerCount(string queue) + { + var ok = QueueDeclarePassive(queue); + return ok.ConsumerCount; + } + + public uint QueueDelete(string queue, + bool ifUnused, + bool ifEmpty) + { + return _Private_QueueDelete(queue, ifUnused, ifEmpty, false); + } + + public void QueueDeleteNoWait(string queue, + bool ifUnused, + bool ifEmpty) + { + _Private_QueueDelete(queue, ifUnused, ifEmpty, true); + } + + public uint QueuePurge(string queue) + { + return _Private_QueuePurge(queue, false); + } + + public abstract void QueueUnbind(string queue, + string exchange, + string routingKey, + IDictionary arguments); + + public abstract void TxCommit(); + + public abstract void TxRollback(); + + public abstract void TxSelect(); + + public bool WaitForConfirms(TimeSpan timeout, out bool timedOut) + { + if (NextPublishSeqNo == 0UL) + { + throw new InvalidOperationException("Confirms not selected"); + } + bool isWaitInfinite = (timeout.TotalMilliseconds == Timeout.Infinite); + Stopwatch stopwatch = Stopwatch.StartNew(); + lock (m_unconfirmedSet.SyncRoot) + { + while (true) + { + if (!IsOpen) + { + throw new AlreadyClosedException(CloseReason); + } + + if (m_unconfirmedSet.Count == 0) + { + bool aux = m_onlyAcksReceived; + m_onlyAcksReceived = true; + timedOut = false; + return aux; + } + if (isWaitInfinite) + { + Monitor.Wait(m_unconfirmedSet.SyncRoot); + } + else + { + TimeSpan elapsed = stopwatch.Elapsed; + if (elapsed > timeout || !Monitor.Wait( + m_unconfirmedSet.SyncRoot, timeout - elapsed)) + { + timedOut = true; + return true; + } + } + } + } + } + + public bool WaitForConfirms() + { + bool timedOut; + return WaitForConfirms(TimeSpan.FromMilliseconds(Timeout.Infinite), out timedOut); + } + + public bool WaitForConfirms(TimeSpan timeout) + { + bool timedOut; + return WaitForConfirms(timeout, out timedOut); + } + + public void WaitForConfirmsOrDie() + { + WaitForConfirmsOrDie(TimeSpan.FromMilliseconds(Timeout.Infinite)); + } + + public void WaitForConfirmsOrDie(TimeSpan timeout) + { + bool timedOut; + bool onlyAcksReceived = WaitForConfirms(timeout, out timedOut); + if (!onlyAcksReceived) + { + Close(new ShutdownEventArgs(ShutdownInitiator.Application, + Constants.ReplySuccess, + "Nacks Received", new IOException("nack received")), + false); + throw new IOException("Nacks Received"); + } + if (timedOut) + { + Close(new ShutdownEventArgs(ShutdownInitiator.Application, + Constants.ReplySuccess, + "Timed out waiting for acks", + new IOException("timed out waiting for acks")), + false); + throw new IOException("Timed out waiting for acks"); + } + } + + protected virtual void handleAckNack(ulong deliveryTag, bool multiple, bool isNack) + { + lock (m_unconfirmedSet.SyncRoot) + { + if (multiple) + { + for (ulong i = m_unconfirmedSet[0]; i <= deliveryTag; i++) + { + // removes potential duplicates + while (m_unconfirmedSet.Remove(i)) + { + } + } + } + else + { + while (m_unconfirmedSet.Remove(deliveryTag)) + { + } + } + m_onlyAcksReceived = m_onlyAcksReceived && !isNack; + if (m_unconfirmedSet.Count == 0) + { + Monitor.Pulse(m_unconfirmedSet.SyncRoot); + } + } + } + + private QueueDeclareOk QueueDeclare(string queue, bool passive, bool durable, bool exclusive, + bool autoDelete, IDictionary arguments) + { + var k = new QueueDeclareRpcContinuation(); + Enqueue(k); + _Private_QueueDeclare(queue, passive, durable, exclusive, autoDelete, false, arguments); + k.GetReply(this.ContinuationTimeout); + return k.m_result; + } + + public class BasicConsumerRpcContinuation : SimpleBlockingRpcContinuation + { + public IBasicConsumer m_consumer; + public string m_consumerTag; + } + + public class BasicGetRpcContinuation : SimpleBlockingRpcContinuation + { + public BasicGetResult m_result; + } + + public class ConnectionOpenContinuation : SimpleBlockingRpcContinuation + { + public string m_host; + public string m_knownHosts; + public bool m_redirect; + } + + public class ConnectionStartRpcContinuation : SimpleBlockingRpcContinuation + { + public ConnectionSecureOrTune m_result; + } + + public class QueueDeclareRpcContinuation : SimpleBlockingRpcContinuation + { + public QueueDeclareOk m_result; + } + } +} From 738c3c3b03abc8a02372558987b97beaaf725608 Mon Sep 17 00:00:00 2001 From: Brian Yule Date: Tue, 17 Oct 2017 08:38:37 +0100 Subject: [PATCH 6/6] Removing Commented code --- .../client/RabbitMQ.Client/src/client/impl/ModelBase.cs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs b/projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs index 0da8ac9edd..1592ef2431 100644 --- a/projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs +++ b/projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs @@ -1063,12 +1063,6 @@ public abstract void _Private_BasicPublish(string exchange, IBasicProperties basicProperties, byte[] body); - //public abstract void _Private_BasicBatchPublish(string exchange, - // string routingKey, - // bool mandatory, - // IEnumerable messages); - - public abstract void _Private_BasicRecover(bool requeue); public abstract void _Private_ChannelClose(ushort replyCode,