-
Notifications
You must be signed in to change notification settings - Fork 428
/
Copy pathclients.py
241 lines (197 loc) · 10.4 KB
/
clients.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
# /*
# * Copyright 2010-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
# *
# * Licensed under the Apache License, Version 2.0 (the "License").
# * You may not use this file except in compliance with the License.
# * A copy of the License is located at
# *
# * http://aws.amazon.com/apache2.0
# *
# * or in the "license" file accompanying this file. This file is distributed
# * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# * express or implied. See the License for the specific language governing
# * permissions and limitations under the License.
# */
import ssl
import logging
from threading import Lock
from numbers import Number
import AWSIoTPythonSDK.core.protocol.paho.client as mqtt
from AWSIoTPythonSDK.core.protocol.paho.client import MQTT_ERR_SUCCESS
from AWSIoTPythonSDK.core.protocol.internal.events import FixedEventMids
class ClientStatus(object):
IDLE = 0
CONNECT = 1
RESUBSCRIBE = 2
DRAINING = 3
STABLE = 4
USER_DISCONNECT = 5
ABNORMAL_DISCONNECT = 6
class ClientStatusContainer(object):
def __init__(self):
self._status = ClientStatus.IDLE
def get_status(self):
return self._status
def set_status(self, status):
if ClientStatus.USER_DISCONNECT == self._status: # If user requests to disconnect, no status updates other than user connect
if ClientStatus.CONNECT == status:
self._status = status
else:
self._status = status
class InternalAsyncMqttClient(object):
_logger = logging.getLogger(__name__)
def __init__(self, client_id, clean_session, protocol, use_wss):
self._paho_client = self._create_paho_client(client_id, clean_session, None, protocol, use_wss)
self._use_wss = use_wss
self._event_callback_map_lock = Lock()
self._event_callback_map = dict()
def _create_paho_client(self, client_id, clean_session, user_data, protocol, use_wss):
self._logger.debug("Initializing MQTT layer...")
return mqtt.Client(client_id, clean_session, user_data, protocol, use_wss)
# TODO: Merge credentials providers configuration into one
def set_cert_credentials_provider(self, cert_credentials_provider):
# History issue from Yun SDK where AR9331 embedded Linux only have Python 2.7.3
# pre-installed. In this version, TLSv1_2 is not even an option.
# SSLv23 is a work-around which selects the highest TLS version between the client
# and service. If user installs opensslv1.0.1+, this option will work fine for Mutual
# Auth.
# Note that we cannot force TLSv1.2 for Mutual Auth. in Python 2.7.3 and TLS support
# in Python only starts from Python2.7.
# See also: https://docs.python.org/2/library/ssl.html#ssl.PROTOCOL_SSLv23
if self._use_wss:
ca_path = cert_credentials_provider.get_ca_path()
self._paho_client.tls_set(ca_certs=ca_path, cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_SSLv23)
else:
ca_path = cert_credentials_provider.get_ca_path()
cert_path = cert_credentials_provider.get_cert_path()
key_path = cert_credentials_provider.get_key_path()
self._paho_client.tls_set(ca_certs=ca_path,certfile=cert_path, keyfile=key_path,
cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_SSLv23)
def set_iam_credentials_provider(self, iam_credentials_provider):
self._paho_client.configIAMCredentials(iam_credentials_provider.get_access_key_id(),
iam_credentials_provider.get_secret_access_key(),
iam_credentials_provider.get_session_token())
def set_endpoint_provider(self, endpoint_provider):
self._endpoint_provider = endpoint_provider
def configure_last_will(self, topic, payload, qos, retain=False):
self._paho_client.will_set(topic, payload, qos, retain)
def configure_alpn_protocols(self, alpn_protocols):
self._paho_client.config_alpn_protocols(alpn_protocols)
def clear_last_will(self):
self._paho_client.will_clear()
def set_username_password(self, username, password=None):
self._paho_client.username_pw_set(username, password)
def configure_reconnect_back_off(self, base_reconnect_quiet_sec, max_reconnect_quiet_sec, stable_connection_sec):
self._paho_client.setBackoffTiming(base_reconnect_quiet_sec, max_reconnect_quiet_sec, stable_connection_sec)
def connect(self, keep_alive_sec, ack_callback=None):
host = self._endpoint_provider.get_host()
port = self._endpoint_provider.get_port()
with self._event_callback_map_lock:
self._logger.debug("Filling in fixed event callbacks: CONNACK, DISCONNECT, MESSAGE")
self._event_callback_map[FixedEventMids.CONNACK_MID] = self._create_combined_on_connect_callback(ack_callback)
self._event_callback_map[FixedEventMids.DISCONNECT_MID] = self._create_combined_on_disconnect_callback(None)
self._event_callback_map[FixedEventMids.MESSAGE_MID] = self._create_converted_on_message_callback()
rc = self._paho_client.connect(host, port, keep_alive_sec)
if MQTT_ERR_SUCCESS == rc:
self.start_background_network_io()
return rc
def start_background_network_io(self):
self._logger.debug("Starting network I/O thread...")
self._paho_client.loop_start()
def stop_background_network_io(self):
self._logger.debug("Stopping network I/O thread...")
self._paho_client.loop_stop()
def disconnect(self, ack_callback=None):
with self._event_callback_map_lock:
rc = self._paho_client.disconnect()
if MQTT_ERR_SUCCESS == rc:
self._logger.debug("Filling in custom disconnect event callback...")
combined_on_disconnect_callback = self._create_combined_on_disconnect_callback(ack_callback)
self._event_callback_map[FixedEventMids.DISCONNECT_MID] = combined_on_disconnect_callback
return rc
def _create_combined_on_connect_callback(self, ack_callback):
def combined_on_connect_callback(mid, data):
self.on_online()
if ack_callback:
ack_callback(mid, data)
return combined_on_connect_callback
def _create_combined_on_disconnect_callback(self, ack_callback):
def combined_on_disconnect_callback(mid, data):
self.on_offline()
if ack_callback:
ack_callback(mid, data)
return combined_on_disconnect_callback
def _create_converted_on_message_callback(self):
def converted_on_message_callback(mid, data):
self.on_message(data)
return converted_on_message_callback
# For client online notification
def on_online(self):
pass
# For client offline notification
def on_offline(self):
pass
# For client message reception notification
def on_message(self, message):
pass
def publish(self, topic, payload, qos, retain=False, ack_callback=None):
with self._event_callback_map_lock:
rc, mid = self._paho_client.publish(topic, payload, qos, retain)
if MQTT_ERR_SUCCESS == rc and qos > 0 and ack_callback:
self._logger.debug("Filling in custom puback (QoS>0) event callback...")
self._event_callback_map[mid] = ack_callback
return rc, mid
def subscribe(self, topic, qos, ack_callback=None):
with self._event_callback_map_lock:
rc, mid = self._paho_client.subscribe(topic, qos)
if MQTT_ERR_SUCCESS == rc and ack_callback:
self._logger.debug("Filling in custom suback event callback...")
self._event_callback_map[mid] = ack_callback
return rc, mid
def unsubscribe(self, topic, ack_callback=None):
with self._event_callback_map_lock:
rc, mid = self._paho_client.unsubscribe(topic)
if MQTT_ERR_SUCCESS == rc and ack_callback:
self._logger.debug("Filling in custom unsuback event callback...")
self._event_callback_map[mid] = ack_callback
return rc, mid
def register_internal_event_callbacks(self, on_connect, on_disconnect, on_publish, on_subscribe, on_unsubscribe, on_message):
self._logger.debug("Registering internal event callbacks to MQTT layer...")
self._paho_client.on_connect = on_connect
self._paho_client.on_disconnect = on_disconnect
self._paho_client.on_publish = on_publish
self._paho_client.on_subscribe = on_subscribe
self._paho_client.on_unsubscribe = on_unsubscribe
self._paho_client.on_message = on_message
def unregister_internal_event_callbacks(self):
self._logger.debug("Unregistering internal event callbacks from MQTT layer...")
self._paho_client.on_connect = None
self._paho_client.on_disconnect = None
self._paho_client.on_publish = None
self._paho_client.on_subscribe = None
self._paho_client.on_unsubscribe = None
self._paho_client.on_message = None
def invoke_event_callback(self, mid, data=None):
with self._event_callback_map_lock:
event_callback = self._event_callback_map.get(mid)
# For invoking the event callback, we do not need to acquire the lock
if event_callback:
self._logger.debug("Invoking custom event callback...")
if data is not None:
event_callback(mid=mid, data=data)
else:
event_callback(mid=mid)
if isinstance(mid, Number): # Do NOT remove callbacks for CONNACK/DISCONNECT/MESSAGE
self._logger.debug("This custom event callback is for pub/sub/unsub, removing it after invocation...")
with self._event_callback_map_lock:
del self._event_callback_map[mid]
def remove_event_callback(self, mid):
with self._event_callback_map_lock:
if mid in self._event_callback_map:
self._logger.debug("Removing custom event callback...")
del self._event_callback_map[mid]
def clean_up_event_callbacks(self):
with self._event_callback_map_lock:
self._event_callback_map.clear()
def get_event_callback_map(self):
return self._event_callback_map