diff --git a/CONTRIBUTORS.md b/CONTRIBUTORS.md index 4198abc3..c0907007 100644 --- a/CONTRIBUTORS.md +++ b/CONTRIBUTORS.md @@ -3,7 +3,7 @@ Authors * Frédéric GARDES * Nicolas BUFFON -* Yann MORIN -* Mathieu LEFEBVRE +* Mathieu LEFEBVRE +* François SUC +* Pierre-Yves LAPERSONNE diff --git a/python/iot3/pyproject.toml b/python/iot3/pyproject.toml index 6577d99d..15ecc6be 100644 --- a/python/iot3/pyproject.toml +++ b/python/iot3/pyproject.toml @@ -20,7 +20,7 @@ classifiers = [ dependencies = [ "paho-mqtt>=2.1.0", "requests>=2.31.0", - "its-quadkeys @ git+https://github.com/Orange-OpenSource/its-client@2083d20a59c5191a1258ece823c0741fce672443#subdirectory=python/its-quadkeys" + "its-quadkeys @ git+https://github.com/Orange-OpenSource/its-client@4ef72eaabacc7504640becb86974a2cfbf9846b3#subdirectory=python/its-quadkeys" ] [project.urls] diff --git a/python/iot3/src/iot3/core/__init__.py b/python/iot3/src/iot3/core/__init__.py index 340b5778..115a678f 100644 --- a/python/iot3/src/iot3/core/__init__.py +++ b/python/iot3/src/iot3/core/__init__.py @@ -156,24 +156,21 @@ def bootstrap( # can be prefixed wth "internal-"... Sigh... :-( otlp_proto = "internal-" + otlp_proto if otlp_proto in bootstrap["protocols"]: + config["otel"] = { + "endpoint": bootstrap["protocols"][otlp_proto], + # In practice, there will *always* be credentials provided in + # the bootstrap response, so we'll always have authentication + # and we know the backend only implements BasicAuth. Prove me + # wrong! ;-) + "auth": _otel.Auth.BASIC, + "username": bootstrap["psk_run_login"], + "password": bootstrap["psk_run_password"], + "service_name": service_name, + "batch_period": 5, + "max_backlog": 100, + "compression": "gzip", + } break - else: - raise RuntimeError("No known OTLP protocol available") - - config["otel"] = { - "endpoint": bootstrap["protocols"][otlp_proto], - # In practice, there will *always* be credentials provided in - # the bootstrap response, so we'll always have authentication - # and we know the backend only implements BasicAuth. Prove me - # wrong! ;-) - "auth": _otel.Auth.BASIC, - "username": bootstrap["psk_run_login"], - "password": bootstrap["psk_run_password"], - "service_name": service_name, - "batch_period": 5, - "max_backlog": 100, - "compression": "gzip", - } return config @@ -201,33 +198,29 @@ def start( if _core is not None: raise RuntimeError("IoT3 Core SDK already intialised.") + _core = dict() o_kwargs = dict() - for auth in _otel.Auth: - if config["otel"]["auth"] == auth.value: - o_kwargs["auth"] = auth - if auth != otel.Auth.NONE: - o_kwargs["username"] = config["otel"]["username"] - o_kwargs["password"] = config["otel"]["password"] - break - else: - raise ValueError(f"unknown authentication {config['otel']['auth']}") - - for comp in _otel.Compression: - if config["otel"]["compression"] == comp.value: - o_kwargs["compression"] = comp - break + if "otel" in config: + o_kwargs["auth"] = _otel.Auth(config["otel"]["auth"]) + if o_kwargs["auth"] != _otel.Auth.NONE: + o_kwargs["username"] = config["otel"]["username"] + o_kwargs["password"] = config["otel"]["password"] + + o_kwargs["compression"] = _otel.Compression(config["otel"]["compression"]) + + o = _otel.Otel( + service_name=config["otel"]["service_name"], + endpoint=config["otel"]["endpoint"], + batch_period=config["otel"]["batch_period"], + max_backlog=config["otel"]["max_backlog"], + **o_kwargs, + ) + o.start() + _core["otel"] = o + span_ctxmgr = o.span else: - raise ValueError(f"unknown compression {config['otel']['compression']}") - - o = _otel.Otel( - service_name=config["otel"]["service_name"], - endpoint=config["otel"]["endpoint"], - batch_period=config["otel"]["batch_period"], - max_backlog=config["otel"]["max_backlog"], - **o_kwargs, - ) - o.start() + span_ctxmgr = None # Wrap the callback to avoid leaking telemetry into the simple API def _msg_cb(*, data, topic, payload, **_kwargs): @@ -241,11 +234,11 @@ def _msg_cb(*, data, topic, payload, **_kwargs): password=config["mqtt"]["password"], msg_cb=_msg_cb if message_callback else None, msg_cb_data=callback_data, - span_ctxmgr_cb=o.span, + span_ctxmgr_cb=span_ctxmgr, ) m.start() - _core = dict([("otel", o), ("mqtt", m)]) + _core["mqtt"] = m def stop(): @@ -256,7 +249,8 @@ def stop(): raise RuntimeError("IoT3 Core SDK not initialised.") _core["mqtt"].stop() - _core["otel"].stop() + if "otel" in _core: + _core["otel"].stop() _core = None @@ -276,7 +270,7 @@ def is_ready() -> bool: def wait_for_ready(): - """Wait until the IoT2 Core SDK is ready. + """Wait until the IoT3 Core SDK is ready. Beware that this may take an indeterminate amount of time; in case the MQTT client can't connect at all, wait_for_ready() diff --git a/python/iot3/src/iot3/core/mqtt.py b/python/iot3/src/iot3/core/mqtt.py index 4f266bbc..11418ef7 100644 --- a/python/iot3/src/iot3/core/mqtt.py +++ b/python/iot3/src/iot3/core/mqtt.py @@ -32,7 +32,7 @@ def __init__( password: Optional[str] = None, msg_cb: Optional[MsgCallbackType] = None, msg_cb_data: Any = None, - span_ctxmgr_cb: Optional[SpanCallableType] = otel.Otel.noexport_span, + span_ctxmgr_cb: Optional[SpanCallableType] = None, ): """ Create an MQTT client @@ -121,7 +121,7 @@ def my_cb( if tls is None: tls = port != 1883 - self.span_ctxmgr_cb = span_ctxmgr_cb + self.span_ctxmgr_cb = span_ctxmgr_cb or otel.Otel.noexport_span self.client = paho.mqtt.client.Client( callback_api_version=paho.mqtt.enums.CallbackAPIVersion.VERSION2, diff --git a/python/iot3/tests/test-iot3-core b/python/iot3/tests/test-iot3-core index 968fbb57..3417beb0 100755 --- a/python/iot3/tests/test-iot3-core +++ b/python/iot3/tests/test-iot3-core @@ -6,9 +6,11 @@ import time def recv(data, topic, payload): - print(f"{topic[:16]}: {payload[:16]}") + print(f"{topic}: {payload[:16]}") +print("IoT3 Core SDK with MQTT and OTLP") + config = iot3.core.sample_config config["mqtt"] = { "host": "test.mosquitto.org", @@ -34,3 +36,25 @@ iot3.core.publish(f"{topic}/passed", "passed") time.sleep(1) iot3.core.stop() + + +print("IoT3 Core SDK with MQTT and no OTLP") + +del config["otel"] + +topic = "test/" + random.randbytes(16).hex() + "/iot3/no-otlp" + +iot3.core.start( + config=config, + message_callback=recv, +) + +iot3.core.subscribe(f"{topic}/+") + +iot3.core.publish(f"{topic}/dropped", "dropped") +time.sleep(1) + +iot3.core.publish(f"{topic}/passed", "passed") +time.sleep(1) + +iot3.core.stop()