From 28a47e5a0a9aa9c67c7edd1393bcd98edc42dcc9 Mon Sep 17 00:00:00 2001 From: brentru Date: Wed, 3 Feb 2021 16:23:18 -0500 Subject: [PATCH 1/6] update example --- .../cpython/minimqtt_simpletest_cpython.py | 25 +++++++------------ 1 file changed, 9 insertions(+), 16 deletions(-) diff --git a/examples/cpython/minimqtt_simpletest_cpython.py b/examples/cpython/minimqtt_simpletest_cpython.py index 35fe18e9..70d8f9a9 100644 --- a/examples/cpython/minimqtt_simpletest_cpython.py +++ b/examples/cpython/minimqtt_simpletest_cpython.py @@ -1,32 +1,31 @@ # SPDX-FileCopyrightText: 2021 ladyada for Adafruit Industries # SPDX-License-Identifier: MIT +import ssl import socket import adafruit_minimqtt.adafruit_minimqtt as MQTT -### Secrets File Setup ### - +# Add a secrets.py to your filesystem that has a dictionary called secrets with "ssid" and +# "password" keys with your WiFi credentials. DO NOT share that file or commit it into Git or other +# source control. +# pylint: disable=no-name-in-module,wrong-import-order try: from secrets import secrets except ImportError: - print("Connection secrets are kept in secrets.py, please add them there!") + print("WiFi secrets are kept in secrets.py, please add them there!") raise ### Topic Setup ### # MQTT Topic # Use this topic if you'd like to connect to a standard MQTT broker -# mqtt_topic = "test/topic" +mqtt_topic = "test/topic" # Adafruit IO-style Topic # Use this topic if you'd like to connect to io.adafruit.com -mqtt_topic = secrets["aio_username"] + "/feeds/temperature" +# mqtt_topic = secrets["aio_username"] + "/feeds/temperature" ### Code ### - -# Keep track of client connection state -disconnect_client = False - # Define callback methods which are called when events occur # pylint: disable=unused-argument, redefined-outer-name def connect(mqtt_client, userdata, flags, rc): @@ -35,40 +34,34 @@ def connect(mqtt_client, userdata, flags, rc): print("Connected to MQTT Broker!") print("Flags: {0}\n RC: {1}".format(flags, rc)) - def disconnect(mqtt_client, userdata, rc): # This method is called when the mqtt_client disconnects # from the broker. print("Disconnected from MQTT Broker!") - def subscribe(mqtt_client, userdata, topic, granted_qos): # This method is called when the mqtt_client subscribes to a new feed. print("Subscribed to {0} with QOS level {1}".format(topic, granted_qos)) - def unsubscribe(mqtt_client, userdata, topic, pid): # This method is called when the mqtt_client unsubscribes from a feed. print("Unsubscribed from {0} with PID {1}".format(topic, pid)) - def publish(mqtt_client, userdata, topic, pid): # This method is called when the mqtt_client publishes data to a feed. print("Published to {0} with PID {1}".format(topic, pid)) - def message(client, topic, message): # Method callled when a client's subscribed feed has a new value. print("New message on topic {0}: {1}".format(topic, message)) - # Set up a MiniMQTT Client mqtt_client = MQTT.MQTT( broker=secrets["broker"], - port=1883, username=secrets["aio_username"], password=secrets["aio_key"], socket_pool=socket, + ssl_context=ssl.create_default_context() ) # Connect callback handlers to mqtt_client From cafcb054de2c07b1e85c78b6c170c53ac156c4d7 Mon Sep 17 00:00:00 2001 From: brentru Date: Wed, 3 Feb 2021 16:27:50 -0500 Subject: [PATCH 2/6] update CI to search subfolders --- .github/workflows/build.yml | 1 + examples/cpython/minimqtt_simpletest_cpython.py | 8 +++++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 59baa537..221659e1 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -54,6 +54,7 @@ jobs: run: | pylint $( find . -path './adafruit*.py' ) ([[ ! -d "examples" ]] || pylint --disable=missing-docstring,invalid-name,bad-whitespace $( find . -path "./examples/*.py" )) + ([[ ! -d "examples" ]] || pylint --disable=missing-docstring,invalid-name,bad-whitespace $( find . -path "./examples/*/*.py" )) - name: Build assets run: circuitpython-build-bundles --filename_prefix ${{ steps.repo-name.outputs.repo-name }} --library_location . - name: Archive bundles diff --git a/examples/cpython/minimqtt_simpletest_cpython.py b/examples/cpython/minimqtt_simpletest_cpython.py index 70d8f9a9..f0d71a09 100644 --- a/examples/cpython/minimqtt_simpletest_cpython.py +++ b/examples/cpython/minimqtt_simpletest_cpython.py @@ -34,34 +34,40 @@ def connect(mqtt_client, userdata, flags, rc): print("Connected to MQTT Broker!") print("Flags: {0}\n RC: {1}".format(flags, rc)) + def disconnect(mqtt_client, userdata, rc): # This method is called when the mqtt_client disconnects # from the broker. print("Disconnected from MQTT Broker!") + def subscribe(mqtt_client, userdata, topic, granted_qos): # This method is called when the mqtt_client subscribes to a new feed. print("Subscribed to {0} with QOS level {1}".format(topic, granted_qos)) + def unsubscribe(mqtt_client, userdata, topic, pid): # This method is called when the mqtt_client unsubscribes from a feed. print("Unsubscribed from {0} with PID {1}".format(topic, pid)) + def publish(mqtt_client, userdata, topic, pid): # This method is called when the mqtt_client publishes data to a feed. print("Published to {0} with PID {1}".format(topic, pid)) + def message(client, topic, message): # Method callled when a client's subscribed feed has a new value. print("New message on topic {0}: {1}".format(topic, message)) + # Set up a MiniMQTT Client mqtt_client = MQTT.MQTT( broker=secrets["broker"], username=secrets["aio_username"], password=secrets["aio_key"], socket_pool=socket, - ssl_context=ssl.create_default_context() + ssl_context=ssl.create_default_context(), ) # Connect callback handlers to mqtt_client From 3648e024264eee3e1ffefa332af96456a9e8cd29 Mon Sep 17 00:00:00 2001 From: brentru Date: Wed, 3 Feb 2021 18:23:00 -0500 Subject: [PATCH 3/6] remove socket reuse from requests, set context within set_interface instead, add a global ssl context --- adafruit_minimqtt/adafruit_minimqtt.py | 132 ++++++++----------------- 1 file changed, 39 insertions(+), 93 deletions(-) diff --git a/adafruit_minimqtt/adafruit_minimqtt.py b/adafruit_minimqtt/adafruit_minimqtt.py index aecde993..33e54953 100755 --- a/adafruit_minimqtt/adafruit_minimqtt.py +++ b/adafruit_minimqtt/adafruit_minimqtt.py @@ -61,9 +61,8 @@ const(0x05): "Connection Refused - Unauthorized", } -_the_interface = None # pylint: disable=invalid-name -_the_sock = None # pylint: disable=invalid-name - +_default_sock = None # pylint: disable=invalid-name +_fake_context = None # pylint: disable=invalid-name class MMQTTException(Exception): """MiniMQTT Exception class.""" @@ -74,18 +73,17 @@ class MMQTTException(Exception): # Legacy ESP32SPI Socket API def set_socket(sock, iface=None): - """Legacy API for setting the socket and network interface, use a Session instead. - + """Legacy API for setting the socket and network interface. :param sock: socket object. :param iface: internet interface object + """ - global _the_sock # pylint: disable=invalid-name, global-statement - _the_sock = sock + global _default_sock # pylint: disable=invalid-name, global-statement + global _fake_context # pylint: disable=invalid-name, global-statement + _default_sock = sock if iface: - global _the_interface # pylint: disable=invalid-name, global-statement - _the_interface = iface - _the_sock.set_interface(iface) - + _default_sock.set_interface(iface) + _fake_context = _FakeSSLContext(iface) class _FakeSSLSocket: def __init__(self, socket, tls_mode): @@ -103,7 +101,6 @@ def connect(self, address): except RuntimeError as error: raise OSError(errno.ENOMEM) from error - class _FakeSSLContext: def __init__(self, iface): self._iface = iface @@ -144,18 +141,7 @@ def __init__( ): self._socket_pool = socket_pool - # Legacy API - if we do not have a socket pool, use default socket - if self._socket_pool is None: - self._socket_pool = _the_sock - self._ssl_context = ssl_context - # Legacy API - if we do not have SSL context, fake it - if self._ssl_context is None: - self._ssl_context = _FakeSSLContext(_the_interface) - - # Hang onto open sockets so that we can reuse them - self._socket_free = {} - self._open_sockets = {} self._sock = None self._backwards_compatible_sock = False @@ -214,93 +200,53 @@ def __init__( self.on_subscribe = None self.on_unsubscribe = None - # Socket helpers - def _free_socket(self, socket): - """Frees a socket for re-use.""" - if socket not in self._open_sockets.values(): - raise RuntimeError("Socket not from MQTT client.") - self._socket_free[socket] = True - - def _close_socket(self, socket): - """Closes a slocket.""" - socket.close() - del self._socket_free[socket] - key = None - for k in self._open_sockets: - if self._open_sockets[k] == socket: - key = k - break - if key: - del self._open_sockets[key] - - def _free_sockets(self): - """Closes all free sockets.""" - free_sockets = [] - for sock in self._socket_free: - if self._socket_free[sock]: - free_sockets.append(sock) - for sock in free_sockets: - self._close_socket(sock) # pylint: disable=too-many-branches def _get_socket(self, host, port, *, timeout=1): - key = (host, port) - if key in self._open_sockets: - sock = self._open_sockets[key] - if self._socket_free[sock]: - self._socket_free[sock] = False - return sock - if port == 8883 and not self._ssl_context: - raise RuntimeError( - "ssl_context must be set before using adafruit_mqtt for secure MQTT." - ) + # For reconnections - check if we're using a socket already and close it + if self._sock: + self._sock.close() # Legacy API - use a default socket instead of socket pool if self._socket_pool is None: - self._socket_pool = _the_sock + self._socket_pool = _default_sock + + # Legacy API - fake the ssl context + if self._ssl_context is None: + self._ssl_context = _fake_context + + if port == 8883 and self._ssl_context is None: + raise RuntimeError( + "ssl_context must be set before using adafruit_mqtt for secure MQTT." + ) addr_info = self._socket_pool.getaddrinfo( host, port, 0, self._socket_pool.SOCK_STREAM )[0] + retry_count = 0 sock = None - while retry_count < 5 and sock is None: - if retry_count > 0: - if any(self._socket_free.items()): - self._free_sockets() - else: - raise RuntimeError("Sending request failed") - retry_count += 1 - try: - sock = self._socket_pool.socket( - addr_info[0], addr_info[1], addr_info[2] - ) - except OSError: - continue - - connect_host = addr_info[-1][0] - if port == 8883: - sock = self._ssl_context.wrap_socket(sock, server_hostname=host) - connect_host = host - sock.settimeout(timeout) + sock = self._socket_pool.socket( + addr_info[0], addr_info[1], addr_info[2] + ) - try: - sock.connect((connect_host, port)) - except MemoryError: - sock.close() - sock = None - except OSError: - sock.close() - sock = None + connect_host = addr_info[-1][0] + if port == 8883: + sock = self._ssl_context.wrap_socket(sock, server_hostname=host) + connect_host = host + sock.settimeout(timeout) - if sock is None: - raise RuntimeError("Repeated socket failures") + try: + sock.connect((connect_host, port)) + except MemoryError as err: + sock.close() + raise MemoryError(err) + except OSError as err: + sock.close() + raise OSError(err) self._backwards_compatible_sock = not hasattr(sock, "recv_into") - - self._open_sockets[key] = sock - self._socket_free[sock] = False return sock def __enter__(self): From 42439ebf9f80294d1a7ce642fc67cee8845d23ff Mon Sep 17 00:00:00 2001 From: brentru Date: Wed, 3 Feb 2021 18:32:04 -0500 Subject: [PATCH 4/6] lint/black/removing some class info --- adafruit_minimqtt/adafruit_minimqtt.py | 31 +++++++++++++------------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/adafruit_minimqtt/adafruit_minimqtt.py b/adafruit_minimqtt/adafruit_minimqtt.py index 33e54953..251e2894 100755 --- a/adafruit_minimqtt/adafruit_minimqtt.py +++ b/adafruit_minimqtt/adafruit_minimqtt.py @@ -62,7 +62,8 @@ } _default_sock = None # pylint: disable=invalid-name -_fake_context = None # pylint: disable=invalid-name +_fake_context = None # pylint: disable=invalid-name + class MMQTTException(Exception): """MiniMQTT Exception class.""" @@ -85,6 +86,7 @@ def set_socket(sock, iface=None): _default_sock.set_interface(iface) _fake_context = _FakeSSLContext(iface) + class _FakeSSLSocket: def __init__(self, socket, tls_mode): self._socket = socket @@ -101,6 +103,7 @@ def connect(self, address): except RuntimeError as error: raise OSError(errno.ENOMEM) from error + class _FakeSSLContext: def __init__(self, iface): self._iface = iface @@ -200,12 +203,16 @@ def __init__( self.on_subscribe = None self.on_unsubscribe = None - - # pylint: disable=too-many-branches def _get_socket(self, host, port, *, timeout=1): + """Obtains and connects a new socket to a host. + :param str host: Desired broker hostname + :param int port: Desired broker port + :param int timeout: Desired socket timeout + """ # For reconnections - check if we're using a socket already and close it if self._sock: self._sock.close() + self._sock = None # Legacy API - use a default socket instead of socket pool if self._socket_pool is None: @@ -224,27 +231,21 @@ def _get_socket(self, host, port, *, timeout=1): host, port, 0, self._socket_pool.SOCK_STREAM )[0] - retry_count = 0 sock = None + sock = self._socket_pool.socket(addr_info[0], addr_info[1], addr_info[2]) - sock = self._socket_pool.socket( - addr_info[0], addr_info[1], addr_info[2] - ) - - connect_host = addr_info[-1][0] if port == 8883: sock = self._ssl_context.wrap_socket(sock, server_hostname=host) - connect_host = host - sock.settimeout(timeout) + sock.settimeout(timeout) try: - sock.connect((connect_host, port)) + sock.connect((addr_info[-1][0], port)) except MemoryError as err: sock.close() - raise MemoryError(err) + raise MemoryError from err except OSError as err: sock.close() - raise OSError(err) + raise OSError from err self._backwards_compatible_sock = not hasattr(sock, "recv_into") return sock @@ -409,7 +410,7 @@ def connect(self, clean_session=True, host=None, port=None, keep_alive=None): if self.logger: self.logger.debug("Attempting to establish MQTT connection...") - # Attempt to get a new socket + # Get a new socket self._sock = self._get_socket(self.broker, self.port) # Fixed Header From 33441caf20d61a5cc613961d3a033ac732c1e58e Mon Sep 17 00:00:00 2001 From: brentru Date: Thu, 4 Feb 2021 11:05:40 -0500 Subject: [PATCH 5/6] add back retry loop --- adafruit_minimqtt/adafruit_minimqtt.py | 38 ++++++++++++++++++-------- 1 file changed, 26 insertions(+), 12 deletions(-) diff --git a/adafruit_minimqtt/adafruit_minimqtt.py b/adafruit_minimqtt/adafruit_minimqtt.py index 251e2894..7acace13 100755 --- a/adafruit_minimqtt/adafruit_minimqtt.py +++ b/adafruit_minimqtt/adafruit_minimqtt.py @@ -232,20 +232,34 @@ def _get_socket(self, host, port, *, timeout=1): )[0] sock = None - sock = self._socket_pool.socket(addr_info[0], addr_info[1], addr_info[2]) + retry_count = 0 + while retry_count < 5 and sock is None: + retry_count += 1 - if port == 8883: - sock = self._ssl_context.wrap_socket(sock, server_hostname=host) + try: + sock = self._socket_pool.socket( + addr_info[0], addr_info[1], addr_info[2] + ) + except OSError: + continue - sock.settimeout(timeout) - try: - sock.connect((addr_info[-1][0], port)) - except MemoryError as err: - sock.close() - raise MemoryError from err - except OSError as err: - sock.close() - raise OSError from err + connect_host = addr_info[-1][0] + if port == 8883: + sock = self._ssl_context.wrap_socket(sock, server_hostname=host) + connect_host = host + sock.settimeout(timeout) + + try: + sock.connect((connect_host, port)) + except MemoryError: + sock.close() + sock = None + except OSError: + sock.close() + sock = None + + if sock is None: + raise RuntimeError("Repeated socket failures") self._backwards_compatible_sock = not hasattr(sock, "recv_into") return sock From c9ab6ab34f6740eb6decebf9a004c3270f1b6e7d Mon Sep 17 00:00:00 2001 From: brentru Date: Thu, 4 Feb 2021 11:08:55 -0500 Subject: [PATCH 6/6] chance method name to be more descriptive --- adafruit_minimqtt/adafruit_minimqtt.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/adafruit_minimqtt/adafruit_minimqtt.py b/adafruit_minimqtt/adafruit_minimqtt.py index 7acace13..d64e9411 100755 --- a/adafruit_minimqtt/adafruit_minimqtt.py +++ b/adafruit_minimqtt/adafruit_minimqtt.py @@ -203,8 +203,8 @@ def __init__( self.on_subscribe = None self.on_unsubscribe = None - def _get_socket(self, host, port, *, timeout=1): - """Obtains and connects a new socket to a host. + def _get_connect_socket(self, host, port, *, timeout=1): + """Obtains a new socket and connects to a broker. :param str host: Desired broker hostname :param int port: Desired broker port :param int timeout: Desired socket timeout @@ -214,7 +214,7 @@ def _get_socket(self, host, port, *, timeout=1): self._sock.close() self._sock = None - # Legacy API - use a default socket instead of socket pool + # Legacy API - use the interface's socket instead of a passed socket pool if self._socket_pool is None: self._socket_pool = _default_sock @@ -222,7 +222,7 @@ def _get_socket(self, host, port, *, timeout=1): if self._ssl_context is None: self._ssl_context = _fake_context - if port == 8883 and self._ssl_context is None: + if port == 8883 and not self._ssl_context: raise RuntimeError( "ssl_context must be set before using adafruit_mqtt for secure MQTT." ) @@ -425,7 +425,7 @@ def connect(self, clean_session=True, host=None, port=None, keep_alive=None): self.logger.debug("Attempting to establish MQTT connection...") # Get a new socket - self._sock = self._get_socket(self.broker, self.port) + self._sock = self._get_connect_socket(self.broker, self.port) # Fixed Header fixed_header = bytearray([0x10])