Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Timeout in connect method #56

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 34 additions & 15 deletions M2Mqtt/MqttClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ public class MqttClient
// event for raising received message event
private AutoResetEvent receiveEventWaitHandle;

private AutoResetEvent closeEventWaitHandle;

// event for starting process inflight queue asynchronously
private AutoResetEvent inflightWaitHandle;

Expand All @@ -137,7 +139,8 @@ public class MqttClient

// exeption thrown during receiving
Exception exReceiving;

// Connection timeout for ssl authentication
private int connectTimeout;
// keep alive period (in ms)
private int keepAlivePeriod;
// events for signaling on keep alive thread
Expand Down Expand Up @@ -272,7 +275,7 @@ public MqttClient(IPAddress brokerIpAddress) :
public MqttClient(IPAddress brokerIpAddress, int brokerPort, bool secure, X509Certificate caCert, X509Certificate clientCert, MqttSslProtocols sslProtocol)
{
#if !(MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK)
this.Init(brokerIpAddress.ToString(), brokerPort, secure, caCert, clientCert, sslProtocol, null, null);
this.Init(brokerIpAddress.ToString(), brokerPort, secure, caCert, clientCert, sslProtocol, 0, null, null);
#else
this.Init(brokerIpAddress.ToString(), brokerPort, secure, caCert, clientCert, sslProtocol);
#endif
Expand All @@ -285,7 +288,7 @@ public MqttClient(IPAddress brokerIpAddress, int brokerPort, bool secure, X509Ce
/// <param name="brokerHostName">Broker Host Name or IP Address</param>
public MqttClient(string brokerHostName) :
#if !(WINDOWS_APP || WINDOWS_PHONE_APP)
this(brokerHostName, MqttSettings.MQTT_BROKER_DEFAULT_PORT, false, null, null, MqttSslProtocols.None)
this(brokerHostName, MqttSettings.MQTT_BROKER_DEFAULT_PORT, false, null, null, MqttSslProtocols.None, 0)
#else
this(brokerHostName, MqttSettings.MQTT_BROKER_DEFAULT_PORT, false, MqttSslProtocols.None)
#endif
Expand All @@ -302,13 +305,13 @@ public MqttClient(string brokerHostName) :
#if !(WINDOWS_APP || WINDOWS_PHONE_APP)
/// <param name="caCert">CA certificate for secure connection</param>
/// <param name="clientCert">Client certificate</param>
public MqttClient(string brokerHostName, int brokerPort, bool secure, X509Certificate caCert, X509Certificate clientCert, MqttSslProtocols sslProtocol)
public MqttClient(string brokerHostName, int brokerPort, bool secure, X509Certificate caCert, X509Certificate clientCert, MqttSslProtocols sslProtocol, int connectTimeout)
#else
public MqttClient(string brokerHostName, int brokerPort, bool secure, MqttSslProtocols sslProtocol)
#endif
{
#if !(MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK || WINDOWS_APP || WINDOWS_PHONE_APP)
this.Init(brokerHostName, brokerPort, secure, caCert, clientCert, sslProtocol, null, null);
this.Init(brokerHostName, brokerPort, secure, caCert, clientCert, sslProtocol, connectTimeout, null, null);
#elif (WINDOWS_APP || WINDOWS_PHONE_APP)
this.Init(brokerHostName, brokerPort, secure, sslProtocol);
#else
Expand All @@ -329,9 +332,9 @@ public MqttClient(string brokerHostName, int brokerPort, bool secure, MqttSslPro
/// <param name="clientCert">Client certificate</param>
/// <param name="sslProtocol">SSL/TLS protocol version</param>
/// <param name="userCertificateValidationCallback">A RemoteCertificateValidationCallback delegate responsible for validating the certificate supplied by the remote party</param>
public MqttClient(string brokerHostName, int brokerPort, bool secure, X509Certificate caCert, X509Certificate clientCert, MqttSslProtocols sslProtocol,
public MqttClient(string brokerHostName, int brokerPort, bool secure, X509Certificate caCert, X509Certificate clientCert, MqttSslProtocols sslProtocol, int connectTimeout,
RemoteCertificateValidationCallback userCertificateValidationCallback)
: this(brokerHostName, brokerPort, secure, caCert, clientCert, sslProtocol, userCertificateValidationCallback, null)
: this(brokerHostName, brokerPort, secure, caCert, clientCert, sslProtocol, connectTimeout, userCertificateValidationCallback, null)
{
}

Expand All @@ -344,10 +347,10 @@ public MqttClient(string brokerHostName, int brokerPort, bool secure, X509Certif
/// <param name="sslProtocol">SSL/TLS protocol version</param>
/// <param name="userCertificateValidationCallback">A RemoteCertificateValidationCallback delegate responsible for validating the certificate supplied by the remote party</param>
/// <param name="userCertificateSelectionCallback">A LocalCertificateSelectionCallback delegate responsible for selecting the certificate used for authentication</param>
public MqttClient(string brokerHostName, int brokerPort, bool secure, MqttSslProtocols sslProtocol,
public MqttClient(string brokerHostName, int brokerPort, bool secure, MqttSslProtocols sslProtocol, int connectTimeout,
RemoteCertificateValidationCallback userCertificateValidationCallback,
LocalCertificateSelectionCallback userCertificateSelectionCallback)
: this(brokerHostName, brokerPort, secure, null, null, sslProtocol, userCertificateValidationCallback, userCertificateSelectionCallback)
: this(brokerHostName, brokerPort, secure, null, null, sslProtocol, connectTimeout, userCertificateValidationCallback, userCertificateSelectionCallback)
{
}

Expand All @@ -362,11 +365,11 @@ public MqttClient(string brokerHostName, int brokerPort, bool secure, MqttSslPro
/// <param name="sslProtocol">SSL/TLS protocol version</param>
/// <param name="userCertificateValidationCallback">A RemoteCertificateValidationCallback delegate responsible for validating the certificate supplied by the remote party</param>
/// <param name="userCertificateSelectionCallback">A LocalCertificateSelectionCallback delegate responsible for selecting the certificate used for authentication</param>
public MqttClient(string brokerHostName, int brokerPort, bool secure, X509Certificate caCert, X509Certificate clientCert, MqttSslProtocols sslProtocol,
public MqttClient(string brokerHostName, int brokerPort, bool secure, X509Certificate caCert, X509Certificate clientCert, MqttSslProtocols sslProtocol, int connectTimeout,
RemoteCertificateValidationCallback userCertificateValidationCallback,
LocalCertificateSelectionCallback userCertificateSelectionCallback)
{
this.Init(brokerHostName, brokerPort, secure, caCert, clientCert, sslProtocol, userCertificateValidationCallback, userCertificateSelectionCallback);
this.Init(brokerHostName, brokerPort, secure, caCert, clientCert, sslProtocol, connectTimeout, userCertificateValidationCallback, userCertificateSelectionCallback);
}
#endif

Expand Down Expand Up @@ -398,6 +401,7 @@ public MqttClient(IMqttNetworkChannel channel)

// queue for received message
this.receiveEventWaitHandle = new AutoResetEvent(false);
this.closeEventWaitHandle = new AutoResetEvent(false);
this.eventQueue = new Queue();
this.internalQueue = new Queue();

Expand All @@ -418,7 +422,7 @@ public MqttClient(IMqttNetworkChannel channel)
#if !(MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK || WINDOWS_APP || WINDOWS_PHONE_APP)
/// <param name="userCertificateSelectionCallback">A RemoteCertificateValidationCallback delegate responsible for validating the certificate supplied by the remote party</param>
/// <param name="userCertificateValidationCallback">A LocalCertificateSelectionCallback delegate responsible for selecting the certificate used for authentication</param>
private void Init(string brokerHostName, int brokerPort, bool secure, X509Certificate caCert, X509Certificate clientCert, MqttSslProtocols sslProtocol,
private void Init(string brokerHostName, int brokerPort, bool secure, X509Certificate caCert, X509Certificate clientCert, MqttSslProtocols sslProtocol, int connectTimeout,
RemoteCertificateValidationCallback userCertificateValidationCallback,
LocalCertificateSelectionCallback userCertificateSelectionCallback)
#elif (WINDOWS_APP || WINDOWS_PHONE_APP)
Expand All @@ -445,7 +449,7 @@ private void Init(string brokerHostName, int brokerPort, bool secure, X509Certif
this.settings.Port = this.brokerPort;
else
this.settings.SslPort = this.brokerPort;

this.connectTimeout = connectTimeout;
this.syncEndReceiving = new AutoResetEvent(false);
this.keepAliveEvent = new AutoResetEvent(false);

Expand All @@ -455,6 +459,7 @@ private void Init(string brokerHostName, int brokerPort, bool secure, X509Certif

// queue for received message
this.receiveEventWaitHandle = new AutoResetEvent(false);
this.closeEventWaitHandle = new AutoResetEvent(false);
this.eventQueue = new Queue();
this.internalQueue = new Queue();

Expand All @@ -463,7 +468,7 @@ private void Init(string brokerHostName, int brokerPort, bool secure, X509Certif

// create network channel
#if !(MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK || WINDOWS_APP || WINDOWS_PHONE_APP)
this.channel = new MqttNetworkChannel(this.brokerHostName, this.brokerPort, secure, caCert, clientCert, sslProtocol, userCertificateValidationCallback, userCertificateSelectionCallback);
this.channel = new MqttNetworkChannel(this.brokerHostName, this.brokerPort, secure, caCert, clientCert, sslProtocol, connectTimeout, userCertificateValidationCallback, userCertificateSelectionCallback);
#elif (WINDOWS_APP || WINDOWS_PHONE_APP)
this.channel = new MqttNetworkChannel(this.brokerHostName, this.brokerPort, secure, sslProtocol);
#else
Expand Down Expand Up @@ -567,7 +572,17 @@ public byte Connect(string clientId,
// start thread for receiving messages from broker
Fx.StartThread(this.ReceiveThread);

MqttMsgConnack connack = (MqttMsgConnack)this.SendReceive(connect);
MqttMsgConnack connack = null;
try
{
connack = (MqttMsgConnack)this.SendReceive(connect);
}
catch (MqttCommunicationException)
{
this.isRunning = false;
throw;
}

// if connection accepted, start keep alive timer and
if (connack.ReturnCode == MqttMsgConnack.CONN_ACCEPTED)
{
Expand Down Expand Up @@ -649,6 +664,9 @@ private void Close()
if (this.receiveEventWaitHandle != null)
this.receiveEventWaitHandle.Set();

if (this.closeEventWaitHandle != null)
this.closeEventWaitHandle.Set();

// wait end process inflight thread
if (this.inflightWaitHandle != null)
this.inflightWaitHandle.Set();
Expand Down Expand Up @@ -1590,6 +1608,7 @@ private void ReceiveThread()
{
// wake up thread that will notify connection is closing
this.OnConnectionClosing();
this.closeEventWaitHandle.WaitOne();
}
}
catch (Exception e)
Expand Down
48 changes: 41 additions & 7 deletions M2Mqtt/Net/MqttNetworkChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ public class MqttNetworkChannel : IMqttNetworkChannel
// SSL/TLS protocol version
private MqttSslProtocols sslProtocol;

// Connection timeout for ssl authentication
private int connectTimeout;

/// <summary>
/// Remote host name
/// </summary>
Expand Down Expand Up @@ -112,7 +115,7 @@ public bool DataAvailable
/// <param name="socket">Socket opened with the client</param>
public MqttNetworkChannel(Socket socket)
#if !(MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK)
: this(socket, false, null, MqttSslProtocols.None, null, null)
: this(socket, false, null, MqttSslProtocols.None, 0, null, null)
#else
: this(socket, false, null, MqttSslProtocols.None)
#endif
Expand All @@ -130,7 +133,7 @@ public MqttNetworkChannel(Socket socket)
#if !(MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK)
/// <param name="userCertificateSelectionCallback">A RemoteCertificateValidationCallback delegate responsible for validating the certificate supplied by the remote party</param>
/// <param name="userCertificateValidationCallback">A LocalCertificateSelectionCallback delegate responsible for selecting the certificate used for authentication</param>
public MqttNetworkChannel(Socket socket, bool secure, X509Certificate serverCert, MqttSslProtocols sslProtocol,
public MqttNetworkChannel(Socket socket, bool secure, X509Certificate serverCert, MqttSslProtocols sslProtocol, int connectTimeout,
RemoteCertificateValidationCallback userCertificateValidationCallback,
LocalCertificateSelectionCallback userCertificateSelectionCallback)
#else
Expand All @@ -141,6 +144,7 @@ public MqttNetworkChannel(Socket socket, bool secure, X509Certificate serverCert
this.secure = secure;
this.serverCert = serverCert;
this.sslProtocol = sslProtocol;
this.connectTimeout = connectTimeout;
#if !(MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK)
this.userCertificateValidationCallback = userCertificateValidationCallback;
this.userCertificateSelectionCallback = userCertificateSelectionCallback;
Expand All @@ -154,7 +158,7 @@ public MqttNetworkChannel(Socket socket, bool secure, X509Certificate serverCert
/// <param name="remotePort">Remote port</param>
public MqttNetworkChannel(string remoteHostName, int remotePort)
#if !(MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK)
: this(remoteHostName, remotePort, false, null, null, MqttSslProtocols.None, null, null)
: this(remoteHostName, remotePort, false, null, null, MqttSslProtocols.None, 0, null, null)
#else
: this(remoteHostName, remotePort, false, null, null, MqttSslProtocols.None)
#endif
Expand All @@ -173,7 +177,7 @@ public MqttNetworkChannel(string remoteHostName, int remotePort)
#if !(MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK)
/// <param name="userCertificateSelectionCallback">A RemoteCertificateValidationCallback delegate responsible for validating the certificate supplied by the remote party</param>
/// <param name="userCertificateValidationCallback">A LocalCertificateSelectionCallback delegate responsible for selecting the certificate used for authentication</param>
public MqttNetworkChannel(string remoteHostName, int remotePort, bool secure, X509Certificate caCert, X509Certificate clientCert, MqttSslProtocols sslProtocol,
public MqttNetworkChannel(string remoteHostName, int remotePort, bool secure, X509Certificate caCert, X509Certificate clientCert, MqttSslProtocols sslProtocol, int connectTimeout,
RemoteCertificateValidationCallback userCertificateValidationCallback,
LocalCertificateSelectionCallback userCertificateSelectionCallback)
#else
Expand Down Expand Up @@ -215,6 +219,7 @@ public MqttNetworkChannel(string remoteHostName, int remotePort, bool secure, X5
this.caCert = caCert;
this.clientCert = clientCert;
this.sslProtocol = sslProtocol;
this.connectTimeout = connectTimeout;
#if !(MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK)
this.userCertificateValidationCallback = userCertificateValidationCallback;
this.userCertificateSelectionCallback = userCertificateSelectionCallback;
Expand Down Expand Up @@ -255,11 +260,40 @@ public void Connect()
if (this.clientCert != null)
clientCertificates = new X509CertificateCollection(new X509Certificate[] { this.clientCert });

this.sslStream.AuthenticateAsClient(this.remoteHostName,
IAsyncResult result = null;
try
{
result = this.sslStream.BeginAuthenticateAsClient(this.remoteHostName,
clientCertificates,
MqttSslUtility.ToSslPlatformEnum(this.sslProtocol),
false);

false,
null,
null);
if (!result.AsyncWaitHandle.WaitOne(this.connectTimeout))
{
throw new Exception(string.Format("Timeout in SSL Authentication. connectTimeout={0}", connectTimeout));
}
this.sslStream.EndAuthenticateAsClient(result);
if (!this.sslStream.IsAuthenticated || !this.sslStream.CanRead)
{
throw new Exception("Authentication error.");
}
}
catch (Exception ex)
{
if (this.sslStream.CanRead)
{
this.sslStream.Close();
}
throw;
}
finally
{
if (result != null)
{
result.AsyncWaitHandle.Close();
}
}
#endif
}
#endif
Expand Down