diff --git a/aiodocker/docker.py b/aiodocker/docker.py index 9e0ae833b..9cd2d4ebb 100644 --- a/aiodocker/docker.py +++ b/aiodocker/docker.py @@ -485,13 +485,16 @@ def __init__(self, docker): self.docker = docker self.channel = Channel() self.json_stream = None + self.task = None def listen(self): warnings.warn("use subscribe() method instead", DeprecationWarning, stacklevel=2) return self.channel.subscribe() - def subscribe(self): + def subscribe(self, create_task=True): + if create_task: + self.task = self.docker.loop.create_task(self.run()) return self.channel.subscribe() def _transform_event(self, data): @@ -531,8 +534,14 @@ async def run(self, **params): self.json_stream = None async def stop(self): - if self.json_stream: + if self.json_stream is not None: await self.json_stream.close() + if self.task: + self.task.cancel() + try: + await self.task + except asyncio.CancelledError: + pass class DockerLog: diff --git a/tests/test_events.py b/tests/test_events.py new file mode 100644 index 000000000..ebcbbc8a8 --- /dev/null +++ b/tests/test_events.py @@ -0,0 +1,22 @@ +import asyncio + +import pytest + + +def test_events_default_task(docker): + docker.events.subscribe() + assert docker.events.task is not None + docker.loop.run_until_complete(docker.events.stop()) + assert docker.events.task.done() + assert docker.events.json_stream is None + + +def test_events_provided_task(docker): + task = docker.loop.create_task(docker.events.run()) + docker.events.subscribe(create_task=False) + assert docker.events.task is None + docker.loop.run_until_complete(docker.events.stop()) + assert docker.events.json_stream is None + task.cancel() + with pytest.raises(asyncio.CancelledError): + docker.loop.run_until_complete(task) diff --git a/tests/test_integration.py b/tests/test_integration.py index d18fad132..78efb835c 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -204,7 +204,6 @@ async def test_port(docker, testing_images, redis_container): @pytest.mark.asyncio async def test_events(docker, testing_images, event_loop): - monitor_task = event_loop.create_task(docker.events.run()) subscriber = docker.events.subscribe() # Do some stuffs to generate events. @@ -232,4 +231,3 @@ async def test_events(docker, testing_images, event_loop): break assert events_occurred == ['create', 'start', 'kill', 'die', 'destroy'] - monitor_task.cancel()