Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBMQTT broker inside a thread #211

Open
interkosmos opened this issue Mar 17, 2020 · 2 comments
Open

HBMQTT broker inside a thread #211

interkosmos opened this issue Mar 17, 2020 · 2 comments

Comments

@interkosmos
Copy link

interkosmos commented Mar 17, 2020

How can I run an HBMQTT broker instance inside a Python thread? The following minimal example doesn’t work:

import asyncio
from threading import Thread
from hbmqtt.broker import Broker

class ExampleThread(Thread):
    def __init__(self):
        super().__init__()
        self.daemon = True
        self.config = {
            'listeners': {
                'default': {
                    'max-connections': 50000,
                    'bind': 'localhost:1883',
                    'type': 'tcp',
                },
            },
            'auth': {
                'allow-anonymous': True,
            },
            'plugins': [ 'auth_anonymous' ],
            'topic-check': {
                'enabled': False
            }
        }
        self.loop = None
        self.broker = None

    async def broker_coroutine(self):
        self.broker = Broker(self.config, self.loop)
        await self.broker.start()
        return self.broker

    async def test_coroutine(self):
        while True:
            await asyncio.sleep(1)
            print('hey!')

    def run(self) -> None:
        print('running ...')
        self.loop.run_forever()
        self.loop.run_until_complete(self.broker.shutdown())
        self.loop.close()

    def start(self):
        print('starting thread ...')
        self.loop = asyncio.new_event_loop()
        print('starting server ...')
        try:
            start_server = asyncio.gather(self.broker_coroutine(),
                                          loop=self.loop)
            self.loop.run_until_complete(start_server)
            broker = start_server.result()[0]
        except KeyboardInterrupt as e:
            self.loop.close()
        except:
            print(traceback.format_exc())
            self.loop.close()

        super().__init__()


if __name__ == '__main__':
    thread = ExampleThread()
    thread.start()

This basic example crashes:

$ python3.7 ./mqtt.py
starting thread ...
starting server ...
Task was destroyed but it is pending!
task: <Task pending coro=<Broker._broadcast_loop() running at venv/lib/python3.7/site-packages/hbmqtt/broker.py:696> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x8027fd890>()]>>
Exception ignored in: <generator object Broker._broadcast_loop at 0x8027d1ed0>
Traceback (most recent call last):
  File "venv/lib/python3.7/site-packages/hbmqtt/broker.py", line 696, in _broadcast_loop
  File "/usr/local/lib/python3.7/asyncio/queues.py", line 161, in get
  File "/usr/local/lib/python3.7/asyncio/base_events.py", line 687, in call_soon
  File "/usr/local/lib/python3.7/asyncio/base_events.py", line 479, in _check_closed
RuntimeError: Event loop is closed

But running the coroutine test_coroutine() instead just works as intended. If asyncio.set_event_loop(self.loop) is added the thread starts but immediately finishes:

    def start(self):
        print('starting thread ...')
        self.loop = asyncio.new_event_loop()
        # Setting the event loop:
        asyncio.set_event_loop(self.loop)
        print('starting server ...')
        # [...]

Output:

$ python3.7 ./mqtt.py
starting thread ...
starting server ...
starting coroutine ...
@FlorianLudwig
Copy link
Contributor

@interkosmos mixing threads with asyncio is generally not recommended.

If you need to (for example you have a not async written code or something cpu heavy) have a look at ProcessPoolExecutor or ThreadPoolExecutor. They allow you to run code in parallel to async code but keep the async loop in control.

@interkosmos
Copy link
Author

You seem to be right. I couldn’t get it running inside a Thread class, but it works using:

    def run(self, loop):
        try:
            future = asyncio.gather(self.broker_coroutine(self.config, loop),
                                    loop=loop)
            loop.run_until_complete(future)
            loop.run_forever()
            loop.run_until_complete(self.broker.shutdown())
            loop.close()
        except KeyboardInterrupt:
            loop.close()
        except:
            loop.close()

    def start(self):
        loop = asyncio.new_event_loop()
        thread = Thread(target=lambda: self.run(loop), daemon=True)
        thread.start()

Therefore, it’s a Python and not an HBMQTT issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants