Skip to content

Commit

Permalink
Add callback validation for publish and disconnect keywords.
Browse files Browse the repository at this point in the history
  • Loading branch information
randomsync committed Mar 3, 2015
1 parent a3eed90 commit c9241c4
Showing 1 changed file with 38 additions and 4 deletions.
42 changes: 38 additions & 4 deletions src/MQTTLibrary/MQTTKeywords.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,11 @@ def connect(self, broker, port=1883, client_id="", clean_session=True):
self._mqttc.connect(broker, int(port))

timer_start = time.time()
while (not self._connected and not self._unexpected_disconnect and
time.time() < timer_start + self._loop_timeout):
while time.time() < timer_start + self._loop_timeout:
if self._connected or self._unexpected_disconnect:
break;
self._mqttc.loop()

if self._unexpected_disconnect:
raise RuntimeError("The client disconnected unexpectedly")
self.builtin.log('client_id: %s' % self._mqttc._client_id, 'DEBUG')
Expand All @@ -83,7 +85,20 @@ def publish(self, topic, message=None, qos=0, retain=False):
"""
self.builtin.log('Publish topic: %s, message: %s, qos: %s, retain: %s'
% (topic, message, qos, retain), 'INFO')
self._mqttc.publish(topic, message, int(qos), retain)
self._mid = -1
self._mqttc.on_publish = self._on_publish
result, mid = self._mqttc.publish(topic, message, int(qos), retain)
if result != 0:
self.builtin.log('Error publishing: %s' % result, 'FAIL')

timer_start = time.time()
while time.time() < timer_start + self._loop_timeout:
if mid == self._mid:
break;
self._mqttc.loop()

if mid != self._mid:
self.builtin.log('mid wasn\'t matched: %s' % mid, 'WARN')

def subscribe(self, topic, qos, timeout=1, limit=1):
""" Subscribe to a topic and return a list of message payloads received
Expand Down Expand Up @@ -193,8 +208,20 @@ def disconnect(self):
| Disconnect |
"""
self._disconnected = False
self._unexpected_disconnect = False
self._mqttc.on_disconnect = self._on_disconnect

self._mqttc.disconnect()

timer_start = time.time()
while time.time() < timer_start + self._loop_timeout:
if self._disconnected or self._unexpected_disconnect:
break;
self._mqttc.loop()
if self._unexpected_disconnect:
raise RuntimeError("The client disconnected unexpectedly")

def _on_message(self, client, userdata, message):
self.builtin.log('Received message: %s on topic: %s with QoS: %s'
% (str(message.payload), message.topic, str(message.qos)), 'DEBUG')
Expand All @@ -209,7 +236,14 @@ def _on_connect(self, client, userdata, flags, rc):
self._connected = True if rc == 0 else False

def _on_disconnect(self, client, userdata, rc):
self._unexpected_disconnect = True if rc != 0 else False
if rc == 0:
self._disconnected = True
self._unexpected_disconnect = False
else:
self._unexpected_disconnect = True

def _on_unsubscribe(self, client, userdata, mid):
self._unsubscribed = True

def _on_publish(self, client, userdata, mid):
self._mid = mid

0 comments on commit c9241c4

Please sign in to comment.