Skip to content

Commit 47363e9

Browse files
committed
Release of version 1.3.1
1 parent 4b4f626 commit 47363e9

File tree

8 files changed

+227
-13
lines changed

8 files changed

+227
-13
lines changed

AWSIoTPythonSDK/MQTTLib.py

+6-2
Original file line numberDiff line numberDiff line change
@@ -652,7 +652,9 @@ def subscribe(self, topic, QoS, callback):
652652
653653
*callback* - Function to be called when a new message for the subscribed topic
654654
comes in. Should be in form :code:`customCallback(client, userdata, message)`, where
655-
:code:`message` contains :code:`topic` and :code:`payload`.
655+
:code:`message` contains :code:`topic` and :code:`payload`. Note that :code:`client` and :code:`userdata` are
656+
here just to be aligned with the underneath Paho callback function signature. These fields are pending to be
657+
deprecated and should not be depended on.
656658
657659
**Returns**
658660
@@ -688,7 +690,9 @@ def subscribeAsync(self, topic, QoS, ackCallback=None, messageCallback=None):
688690
689691
*messageCallback* - Function to be called when a new message for the subscribed topic
690692
comes in. Should be in form :code:`customCallback(client, userdata, message)`, where
691-
:code:`message` contains :code:`topic` and :code:`payload`.
693+
:code:`message` contains :code:`topic` and :code:`payload`. Note that :code:`client` and :code:`userdata` are
694+
here just to be aligned with the underneath Paho callback function signature. These fields are pending to be
695+
deprecated and should not be depended on.
692696
693697
**Returns**
694698

AWSIoTPythonSDK/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
__version__ = "1.3.0"
1+
__version__ = "1.3.1"
22

33

AWSIoTPythonSDK/core/protocol/internal/clients.py

+12-10
Original file line numberDiff line numberDiff line change
@@ -151,8 +151,8 @@ def combined_on_disconnect_callback(mid, data):
151151
return combined_on_disconnect_callback
152152

153153
def _create_converted_on_message_callback(self):
154-
def converted_on_message_callback(mid, message):
155-
self.on_message(message)
154+
def converted_on_message_callback(mid, data):
155+
self.on_message(data)
156156
return converted_on_message_callback
157157

158158
# For client online notification
@@ -212,14 +212,16 @@ def unregister_internal_event_callbacks(self):
212212
def invoke_event_callback(self, mid, data=None):
213213
with self._event_callback_map_lock:
214214
event_callback = self._event_callback_map.get(mid)
215-
if event_callback:
216-
self._logger.debug("Invoking custom event callback...")
217-
if data is not None:
218-
event_callback(mid, data)
219-
else:
220-
event_callback(mid)
221-
if isinstance(mid, Number): # Do NOT remove callbacks for CONNACK/DISCONNECT/MESSAGE
222-
self._logger.debug("This custom event callback is for pub/sub/unsub, removing it after invocation...")
215+
# For invoking the event callback, we do not need to acquire the lock
216+
if event_callback:
217+
self._logger.debug("Invoking custom event callback...")
218+
if data is not None:
219+
event_callback(mid=mid, data=data)
220+
else:
221+
event_callback(mid=mid)
222+
if isinstance(mid, Number): # Do NOT remove callbacks for CONNACK/DISCONNECT/MESSAGE
223+
self._logger.debug("This custom event callback is for pub/sub/unsub, removing it after invocation...")
224+
with self._event_callback_map_lock:
223225
del self._event_callback_map[mid]
224226

225227
def remove_event_callback(self, mid):

AWSIoTPythonSDK/core/protocol/internal/workers.py

+12
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import time
1717
import logging
1818
from threading import Thread
19+
from threading import Event
1920
from AWSIoTPythonSDK.core.protocol.internal.events import EventTypes
2021
from AWSIoTPythonSDK.core.protocol.internal.events import FixedEventMids
2122
from AWSIoTPythonSDK.core.protocol.internal.clients import ClientStatus
@@ -91,6 +92,7 @@ def __init__(self, cv, event_queue, internal_async_client,
9192
RequestTypes.SUBSCRIBE : self._handle_offline_subscribe,
9293
RequestTypes.UNSUBSCRIBE : self._handle_offline_unsubscribe
9394
}
95+
self._stopper = Event()
9496

9597
def update_offline_requests_manager(self, offline_requests_manager):
9698
self._offline_requests_manager = offline_requests_manager
@@ -105,6 +107,7 @@ def is_running(self):
105107
return self._is_running
106108

107109
def start(self):
110+
self._stopper.clear()
108111
self._is_running = True
109112
dispatch_events = Thread(target=self._dispatch)
110113
dispatch_events.daemon = True
@@ -127,6 +130,13 @@ def _clean_up(self):
127130
self._internal_async_client.clean_up_event_callbacks()
128131
self._logger.debug("Event callbacks cleared")
129132

133+
def wait_until_it_stops(self, timeout_sec):
134+
self._logger.debug("Waiting for event consumer to completely stop")
135+
return self._stopper.wait(timeout=timeout_sec)
136+
137+
def is_fully_stopped(self):
138+
return self._stopper.is_set()
139+
130140
def _dispatch(self):
131141
while self._is_running:
132142
with self._cv:
@@ -135,6 +145,8 @@ def _dispatch(self):
135145
else:
136146
while not self._event_queue.empty():
137147
self._dispatch_one()
148+
self._stopper.set()
149+
self._logger.debug("Exiting dispatching loop...")
138150

139151
def _dispatch_one(self):
140152
mid, event_type, data = self._event_queue.get()

AWSIoTPythonSDK/core/protocol/mqtt_core.py

+3
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,9 @@ def disconnect(self):
219219
if not event.wait(self._connect_disconnect_timeout_sec):
220220
self._logger.error("Disconnect timed out")
221221
raise disconnectTimeoutException()
222+
if not self._event_consumer.wait_until_it_stops(self._connect_disconnect_timeout_sec):
223+
self._logger.error("Disconnect timed out in waiting for event consumer")
224+
raise disconnectTimeoutException()
222225
return True
223226

224227
def disconnect_async(self, ack_callback=None):

CHANGELOG.rst

+7
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,13 @@
22
CHANGELOG
33
=========
44

5+
1.3.1
6+
=====
7+
* bugfix:Issue:`#67 <https://github.com/aws/aws-iot-device-sdk-python/issues/67>`__
8+
* bugfix:Fixed a dead lock issue when client async API is called within the event callback
9+
* bugfix:Updated README and API documentation to provide clear usage information on sync/async API and callbacks
10+
* improvement:Added a new sample to show API usage within callbacks
11+
512
1.3.0
613
=====
714
* bugfix:WebSocket handshake response timeout and error escalation

README.rst

+61
Original file line numberDiff line numberDiff line change
@@ -348,6 +348,36 @@ API documentation for ``AWSIoTPythonSDK.core.greengrass.discovery.models``,
348348
`Greengrass Discovery documentation <http://docs.aws.amazon.com/greengrass/latest/developerguide/gg-discover-api.html>`__
349349
or `Greengrass overall documentation <http://docs.aws.amazon.com/greengrass/latest/developerguide/what-is-gg.html>`__.
350350

351+
352+
Synchronous APIs and Asynchronous APIs
353+
______________________________________
354+
355+
Beginning with Release v1.2.0, SDK provides asynchronous APIs and enforces synchronous API behaviors for MQTT operations,
356+
which includes:
357+
- connect/connectAsync
358+
- disconnect/disconnectAsync
359+
- publish/publishAsync
360+
- subscribe/subscribeAsync
361+
- unsubscribe/unsubscribeAsync
362+
363+
- Asynchronous APIs
364+
Asynchronous APIs translate the invocation into MQTT packet and forward it to the underneath connection to be sent out.
365+
They return immediately once packets are out for delivery, regardless of whether the corresponding ACKs, if any, have
366+
been received. Users can specify their own callbacks for ACK/message (server side PUBLISH) processing for each
367+
individual request. These callbacks will be sequentially dispatched and invoked upon the arrival of ACK/message (server
368+
side PUBLISH) packets.
369+
370+
- Synchronous APIs
371+
Synchronous API behaviors are enforced by registering blocking ACK callbacks on top of the asynchronous APIs.
372+
Synchronous APIs wait on their corresponding ACK packets, if there is any, before the invocation returns. For example,
373+
a synchronous QoS1 publish call will wait until it gets its PUBACK back. A synchronous subscribe call will wait until
374+
it gets its SUBACK back. Users can configure operation time out for synchronous APIs to stop the waiting.
375+
376+
Since callbacks are sequentially dispatched and invoked, calling synchronous APIs within callbacks will deadlock the
377+
user application. If users are inclined to utilize the asynchronous mode and perform MQTT operations
378+
within callbacks, asynchronous APIs should be used. For more details, please check out the provided samples at
379+
``samples/basicPubSub/basicPubSub_APICallInCallback.py``
380+
351381
.. _Key_Features:
352382

353383
Key Features
@@ -642,6 +672,37 @@ Source
642672

643673
The example is available in ``samples/basicPubSub/``.
644674

675+
BasicPubSub with API invocation in callback
676+
___________
677+
678+
This example demonstrates the usage of asynchronous APIs within callbacks. It first connects to AWS IoT and subscribes
679+
to 2 topics with the corresponding message callbacks registered. One message callback contains client asynchronous API
680+
invocation that republishes the received message from <topic> to <topic>/republish. The other message callback simply
681+
prints out the received message. It then publishes messages to <topic> in an infinite loop. For every message received
682+
from <topic>, it will be republished to <topic>/republish and be printed out as configured in the simple print-out
683+
message callback.
684+
New ack packet ids are printed upon reception of PUBACK and SUBACK through ACK callbacks registered with asynchronous
685+
API calls, indicating that the the client received ACKs for the corresponding asynchronous API calls.
686+
687+
Instructions
688+
************
689+
690+
Run the example like this:
691+
692+
.. code-block:: python
693+
694+
# Certificate based mutual authentication
695+
python basicPubSub_APICallInCallback.py -e <endpoint> -r <rootCAFilePath> -c <certFilePath> -k <privateKeyFilePath>
696+
# MQTT over WebSocket
697+
python basicPubSub_APICallInCallback.py -e <endpoint> -r <rootCAFilePath> -w
698+
# Customize client id and topic
699+
python basicPubSub_APICallInCallback.py -e <endpoint> -r <rootCAFilePath> -c <certFilePath> -k <privateKeyFilePath> -id <clientId> -t <topic>
700+
701+
Source
702+
******
703+
704+
The example is available in ``samples/basicPubSub/``.
705+
645706
BasicShadow
646707
___________
647708

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
'''
2+
/*
3+
* Copyright 2010-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License").
6+
* You may not use this file except in compliance with the License.
7+
* A copy of the License is located at
8+
*
9+
* http://aws.amazon.com/apache2.0
10+
*
11+
* or in the "license" file accompanying this file. This file is distributed
12+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
13+
* express or implied. See the License for the specific language governing
14+
* permissions and limitations under the License.
15+
*/
16+
'''
17+
18+
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
19+
import logging
20+
import time
21+
import argparse
22+
23+
24+
class CallbackContainer(object):
25+
26+
def __init__(self, client):
27+
self._client = client
28+
29+
def messagePrint(self, client, userdata, message):
30+
print("Received a new message: ")
31+
print(message.payload)
32+
print("from topic: ")
33+
print(message.topic)
34+
print("--------------\n\n")
35+
36+
def messageForward(self, client, userdata, message):
37+
topicRepublish = message.topic + "/republish"
38+
print("Forwarding message from: %s to %s" % (message.topic, topicRepublish))
39+
print("--------------\n\n")
40+
self._client.publishAsync(topicRepublish, str(message.payload), 1, self.pubackCallback)
41+
42+
def pubackCallback(self, mid):
43+
print("Received PUBACK packet id: ")
44+
print(mid)
45+
print("++++++++++++++\n\n")
46+
47+
def subackCallback(self, mid, data):
48+
print("Received SUBACK packet id: ")
49+
print(mid)
50+
print("Granted QoS: ")
51+
print(data)
52+
print("++++++++++++++\n\n")
53+
54+
55+
# Read in command-line parameters
56+
parser = argparse.ArgumentParser()
57+
parser.add_argument("-e", "--endpoint", action="store", required=True, dest="host", help="Your AWS IoT custom endpoint")
58+
parser.add_argument("-r", "--rootCA", action="store", required=True, dest="rootCAPath", help="Root CA file path")
59+
parser.add_argument("-c", "--cert", action="store", dest="certificatePath", help="Certificate file path")
60+
parser.add_argument("-k", "--key", action="store", dest="privateKeyPath", help="Private key file path")
61+
parser.add_argument("-w", "--websocket", action="store_true", dest="useWebsocket", default=False,
62+
help="Use MQTT over WebSocket")
63+
parser.add_argument("-id", "--clientId", action="store", dest="clientId", default="basicPubSub",
64+
help="Targeted client id")
65+
parser.add_argument("-t", "--topic", action="store", dest="topic", default="sdk/test/Python", help="Targeted topic")
66+
67+
args = parser.parse_args()
68+
host = args.host
69+
rootCAPath = args.rootCAPath
70+
certificatePath = args.certificatePath
71+
privateKeyPath = args.privateKeyPath
72+
useWebsocket = args.useWebsocket
73+
clientId = args.clientId
74+
topic = args.topic
75+
76+
if args.useWebsocket and args.certificatePath and args.privateKeyPath:
77+
parser.error("X.509 cert authentication and WebSocket are mutual exclusive. Please pick one.")
78+
exit(2)
79+
80+
if not args.useWebsocket and (not args.certificatePath or not args.privateKeyPath):
81+
parser.error("Missing credentials for authentication.")
82+
exit(2)
83+
84+
# Configure logging
85+
logger = logging.getLogger("AWSIoTPythonSDK.core")
86+
logger.setLevel(logging.DEBUG)
87+
streamHandler = logging.StreamHandler()
88+
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
89+
streamHandler.setFormatter(formatter)
90+
logger.addHandler(streamHandler)
91+
92+
# Init AWSIoTMQTTClient
93+
myAWSIoTMQTTClient = None
94+
if useWebsocket:
95+
myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId, useWebsocket=True)
96+
myAWSIoTMQTTClient.configureEndpoint(host, 443)
97+
myAWSIoTMQTTClient.configureCredentials(rootCAPath)
98+
else:
99+
myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId)
100+
myAWSIoTMQTTClient.configureEndpoint(host, 8883)
101+
myAWSIoTMQTTClient.configureCredentials(rootCAPath, privateKeyPath, certificatePath)
102+
103+
# AWSIoTMQTTClient connection configuration
104+
myAWSIoTMQTTClient.configureAutoReconnectBackoffTime(1, 32, 20)
105+
myAWSIoTMQTTClient.configureOfflinePublishQueueing(-1) # Infinite offline Publish queueing
106+
myAWSIoTMQTTClient.configureDrainingFrequency(2) # Draining: 2 Hz
107+
myAWSIoTMQTTClient.configureConnectDisconnectTimeout(10) # 10 sec
108+
myAWSIoTMQTTClient.configureMQTTOperationTimeout(5) # 5 sec
109+
110+
myCallbackContainer = CallbackContainer(myAWSIoTMQTTClient)
111+
112+
# Connect and subscribe to AWS IoT
113+
myAWSIoTMQTTClient.connect()
114+
115+
# Perform synchronous subscribes
116+
myAWSIoTMQTTClient.subscribe(topic, 1, myCallbackContainer.messageForward)
117+
myAWSIoTMQTTClient.subscribe(topic + "/republish", 1, myCallbackContainer.messagePrint)
118+
time.sleep(2)
119+
120+
# Publish to the same topic in a loop forever
121+
loopCount = 0
122+
while True:
123+
myAWSIoTMQTTClient.publishAsync(topic, "New Message " + str(loopCount), 1, ackCallback=myCallbackContainer.pubackCallback)
124+
loopCount += 1
125+
time.sleep(1)

0 commit comments

Comments
 (0)