@@ -128,6 +128,7 @@ class MQTT:
128128 :param str client_id: Optional client identifier, defaults to a unique, generated string.
129129 :param bool is_ssl: Sets a secure or insecure connection with the broker.
130130 :param int keep_alive: KeepAlive interval between the broker and the MiniMQTT client.
131+ :param int recv_timeout: receive timeout, in seconds.
131132 :param socket socket_pool: A pool of socket resources available for the given radio.
132133 :param ssl_context: SSL context for long-lived SSL connections.
133134 :param bool use_binary_mode: Messages are passed as bytearray instead of string to callbacks.
@@ -146,6 +147,7 @@ def __init__(
146147 client_id = None ,
147148 is_ssl = True ,
148149 keep_alive = 60 ,
150+ recv_timeout = 10 ,
149151 socket_pool = None ,
150152 ssl_context = None ,
151153 use_binary_mode = False ,
@@ -157,7 +159,13 @@ def __init__(
157159 self ._sock = None
158160 self ._backwards_compatible_sock = False
159161 self ._use_binary_mode = use_binary_mode
162+
163+ if recv_timeout <= socket_timeout :
164+ raise MMQTTException (
165+ "recv_timeout must be strictly greater than socket_timeout"
166+ )
160167 self ._socket_timeout = socket_timeout
168+ self ._recv_timeout = recv_timeout
161169
162170 self .keep_alive = keep_alive
163171 self ._user_data = None
@@ -522,6 +530,7 @@ def connect(self, clean_session=True, host=None, port=None, keep_alive=None):
522530 self ._send_str (self ._password )
523531 if self .logger is not None :
524532 self .logger .debug ("Receiving CONNACK packet from broker" )
533+ stamp = time .monotonic ()
525534 while True :
526535 op = self ._wait_for_msg ()
527536 if op == 32 :
@@ -535,6 +544,12 @@ def connect(self, clean_session=True, host=None, port=None, keep_alive=None):
535544 self .on_connect (self , self ._user_data , result , rc [2 ])
536545 return result
537546
547+ if op is None :
548+ if time .monotonic () - stamp > self ._recv_timeout :
549+ raise MMQTTException (
550+ f"No data received from broker for { self ._recv_timeout } seconds."
551+ )
552+
538553 def disconnect (self ):
539554 """Disconnects the MiniMQTT client from the MQTT broker."""
540555 self .is_connected ()
@@ -645,6 +660,7 @@ def publish(self, topic, msg, retain=False, qos=0):
645660 if qos == 0 and self .on_publish is not None :
646661 self .on_publish (self , self ._user_data , topic , self ._pid )
647662 if qos == 1 :
663+ stamp = time .monotonic ()
648664 while True :
649665 op = self ._wait_for_msg ()
650666 if op == 0x40 :
@@ -657,6 +673,12 @@ def publish(self, topic, msg, retain=False, qos=0):
657673 self .on_publish (self , self ._user_data , topic , rcv_pid )
658674 return
659675
676+ if op is None :
677+ if time .monotonic () - stamp > self ._recv_timeout :
678+ raise MMQTTException (
679+ f"No data received from broker for { self ._recv_timeout } seconds."
680+ )
681+
660682 def subscribe (self , topic , qos = 0 ):
661683 """Subscribes to a topic on the MQTT Broker.
662684 This method can subscribe to one topics or multiple topics.
@@ -705,6 +727,7 @@ def subscribe(self, topic, qos=0):
705727 for t , q in topics :
706728 self .logger .debug ("SUBSCRIBING to topic %s with QoS %d" , t , q )
707729 self ._sock .send (packet )
730+ stamp = time .monotonic ()
708731 while True :
709732 op = self ._wait_for_msg ()
710733 if op == 0x90 :
@@ -718,6 +741,12 @@ def subscribe(self, topic, qos=0):
718741 self ._subscribed_topics .append (t )
719742 return
720743
744+ if op is None :
745+ if time .monotonic () - stamp > self ._recv_timeout :
746+ raise MMQTTException (
747+ f"No data received from broker for { self ._recv_timeout } seconds."
748+ )
749+
721750 def unsubscribe (self , topic ):
722751 """Unsubscribes from a MQTT topic.
723752
@@ -755,6 +784,7 @@ def unsubscribe(self, topic):
755784 if self .logger is not None :
756785 self .logger .debug ("Waiting for UNSUBACK..." )
757786 while True :
787+ stamp = time .monotonic ()
758788 op = self ._wait_for_msg ()
759789 if op == 176 :
760790 rc = self ._sock_exact_recv (3 )
@@ -767,6 +797,12 @@ def unsubscribe(self, topic):
767797 self ._subscribed_topics .remove (t )
768798 return
769799
800+ if op is None :
801+ if time .monotonic () - stamp > self ._recv_timeout :
802+ raise MMQTTException (
803+ f"No data received from broker for { self ._recv_timeout } seconds."
804+ )
805+
770806 def reconnect (self , resub_topics = True ):
771807 """Attempts to reconnect to the MQTT broker.
772808
0 commit comments