Skip to content
This repository has been archived by the owner on May 23, 2023. It is now read-only.

Commit

Permalink
Asyncio context manager with contextvars. Add different versions of t…
Browse files Browse the repository at this point in the history
…ornado to travis.yml.
  • Loading branch information
condorcet committed May 6, 2019
1 parent 35a7bc9 commit a3bc006
Show file tree
Hide file tree
Showing 11 changed files with 118 additions and 69 deletions.
9 changes: 9 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,21 @@ python:
- "3.7"
- "3.8-dev"

env:
- TORNADO=">=4,<5"
- TORNADO=">=5,<6"
- TORNADO=">=6"

matrix:
allow_failures:
- python: "3.8-dev"
exclude:
- python: "2.7"
env: TORNADO=">=6"

install:
- make bootstrap
- pip install -q "tornado$TORNADO"

script:
- make test testbed lint
5 changes: 5 additions & 0 deletions opentracing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@
# limitations under the License.

from __future__ import absolute_import
try:
# Contextvars backport with coroutine supporting (python 3.6).
import aiocontextvars # noqa
except ImportError:
pass
from .span import Span # noqa
from .span import SpanContext # noqa
from .scope import Scope # noqa
Expand Down
71 changes: 18 additions & 53 deletions opentracing/scope_managers/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,29 @@

from __future__ import absolute_import

import asyncio
from contextvars import ContextVar

from opentracing import Scope
from opentracing.scope_managers import ThreadLocalScopeManager
from .constants import ACTIVE_ATTR


_SCOPE = ContextVar('scope')


class AsyncioScopeManager(ThreadLocalScopeManager):
"""
:class:`~opentracing.ScopeManager` implementation for **asyncio**
that stores the :class:`~opentracing.Scope` in the current
:class:`Task` (:meth:`Task.current_task()`), falling back to
thread-local storage if none was being executed.
that stores the :class:`~opentracing.Scope` using ContextVar.
Automatic :class:`~opentracing.Span` propagation from
parent coroutines to their children is not provided, which needs to be
done manually:
The scope manager provides automatic :class:`~opentracing.Span` propagation
from parent coroutines to their children.
.. code-block:: python
async def child_coroutine(span):
# activate the parent Span, but do not finish it upon
# deactivation. That will be done by the parent coroutine.
with tracer.scope_manager.activate(span, finish_on_close=False):
with tracer.start_active_span('child') as scope:
...
# No need manual activation of parent span in child coroutine.
with tracer.start_active_span('child') as scope:
...
async def parent_coroutine():
with tracer.start_active_span('parent') as scope:
Expand All @@ -63,24 +60,13 @@ def activate(self, span, finish_on_close):
:param finish_on_close: whether *span* should automatically be
finished when :meth:`Scope.close()` is called.
If no :class:`Task` is being executed, thread-local
storage will be used to store the :class:`~opentracing.Scope`.
:return: a :class:`~opentracing.Scope` instance to control the end
of the active period for the :class:`~opentracing.Span`.
It is a programming error to neglect to call :meth:`Scope.close()`
on the returned instance.
"""

task = self._get_task()
if not task:
return super(AsyncioScopeManager, self).activate(span,
finish_on_close)

scope = _AsyncioScope(self, span, finish_on_close)
self._set_task_scope(scope, task)

return scope
return self._set_scope(span, finish_on_close)

@property
def active(self):
Expand All @@ -93,46 +79,25 @@ def active(self):
or ``None`` if not available.
"""

task = self._get_task()
if not task:
return super(AsyncioScopeManager, self).active
return self._get_scope()

return self._get_task_scope(task)
def _set_scope(self, span, finish_on_close):
return _AsyncioScope(self, span, finish_on_close)

def _get_task(self):
try:
# Prevent failure when run from a thread
# without an event loop.
loop = asyncio.get_event_loop()
except RuntimeError:
return None

return asyncio.Task.current_task(loop=loop)

def _set_task_scope(self, scope, task=None):
if task is None:
task = self._get_task()

setattr(task, ACTIVE_ATTR, scope)

def _get_task_scope(self, task=None):
if task is None:
task = self._get_task()

return getattr(task, ACTIVE_ATTR, None)
def _get_scope(self):
return _SCOPE.get(None)


class _AsyncioScope(Scope):
def __init__(self, manager, span, finish_on_close):
super(_AsyncioScope, self).__init__(manager, span)
self._finish_on_close = finish_on_close
self._to_restore = manager.active
self._token = _SCOPE.set(self)

def close(self):
if self.manager.active is not self:
return

self.manager._set_task_scope(self._to_restore)
_SCOPE.reset(self._token)

if self._finish_on_close:
self.span.finish()
7 changes: 5 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
include_package_data=True,
zip_safe=False,
platforms='any',
install_requires=[
'futures;python_version=="2.7"',
'aiocontextvars;python_version>="3.5"',
],
extras_require={
'tests': [
'doubles',
Expand All @@ -40,8 +44,7 @@

'six>=1.10.0,<2.0',
'gevent',
'tornado<6',
'tornado',
],
':python_version == "2.7"': ['futures'],
},
)
9 changes: 7 additions & 2 deletions testbed/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@ Alternatively, due to the organization of the suite, it's possible to run direct

## Tested frameworks

Currently the examples cover `threading`, `tornado`, `gevent` and `asyncio` (which requires Python 3). Each example uses their respective `ScopeManager` instance from `opentracing.scope_managers`, along with their related requirements and limitations.
Currently the examples cover from ..utils import get_one_by_operation_name, stop_loop_when
`threading`, `tornado`, `gevent` and `asyncio` (which requires Python 3). Each example uses their respective `ScopeManager` instance from `opentracing.scope_managers`, along with their related requirements and limitations.

### threading, asyncio and gevent
### threading and gevent

No automatic `Span` propagation between parent and children tasks is provided, and thus the `Span` need to be manually passed down the chain.

### asyncio

`AsyncioScopeManager` supports automatically propagate the context from parent coroutines to their children. For compatibility reasons with previous version of `AsyncioScopeManager`, asyncio testbed contains test cases showing that manual activation of parent span in child span also works as expected.

### tornado

`TornadoScopeManager` uses a variation of `tornado.stack_context.StackContext` to both store **and** automatically propagate the context from parent coroutines to their children.
Expand Down
12 changes: 10 additions & 2 deletions testbed/__main__.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
from importlib import import_module
import logging
import os
import sys
import six
import unittest
from tornado import version_info as tornado_version


enabled_platforms = [
'threads',
'tornado',
'gevent',
]
if tornado_version < (6, 0, 0, 0):
# Including testbed for Tornado coroutines and stack context.
# We don't need run testbed in case Tornado>=6, because it became
# asyncio-based framework and `stack_context` was deprecated.
enabled_platforms.append('tornado')
if six.PY3:
enabled_platforms.append('asyncio')

Expand Down Expand Up @@ -47,4 +53,6 @@ def get_test_directories():
suite = loader.loadTestsFromModule(test_module)
main_suite.addTests(suite)

unittest.TextTestRunner(verbosity=3).run(main_suite)
result = unittest.TextTestRunner(verbosity=3).run(main_suite)
if result.failures or result.errors:
sys.exit(1)
19 changes: 16 additions & 3 deletions testbed/test_late_span_finish/test_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@ def setUp(self):

def test_main(self):
# Create a Span and use it as (explicit) parent of a pair of subtasks.
parent_span = self.tracer.start_span('parent')
parent_scope = self.tracer.start_active_span('parent')
parent_span = parent_scope.span
self.submit_subtasks(parent_span)

stop_loop_when(self.loop,
lambda: len(self.tracer.finished_spans()) >= 2)
self.loop.run_forever()

# Late-finish the parent Span now.
parent_span.finish()
parent_scope.close()

spans = self.tracer.finished_spans()
self.assertEqual(len(spans), 3)
Expand All @@ -44,7 +45,19 @@ async def task(name):
logger.info('Running %s' % name)
with self.tracer.scope_manager.activate(parent_span, False):
with self.tracer.start_active_span(name):
asyncio.sleep(0.1)
await asyncio.sleep(0.1)

self.loop.create_task(task('task1'))
self.loop.create_task(task('task2'))


class TestAutoContextPropagationAsyncio(TestAsyncio):

def submit_subtasks(self, parent_span):
async def task(name):
logger.info('Running %s' % name)
with self.tracer.start_active_span(name):
await asyncio.sleep(0.1)

self.loop.create_task(task('task1'))
self.loop.create_task(task('task2'))
8 changes: 8 additions & 0 deletions testbed/test_multiple_callbacks/test_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,11 @@ def submit_callbacks(self):
tasks.append(t)

return tasks


class TestAutoContextPropagationAsyncio(TestAsyncio):

async def task(self, interval, parent_span):
logger.info('Starting task')
with self.tracer.start_active_span('task'):
await asyncio.sleep(interval)
22 changes: 22 additions & 0 deletions testbed/test_nested_callbacks/test_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,25 @@ async def task3():
self.loop.create_task(task2())

self.loop.create_task(task1())


class TestAutoContextPropagationAsyncio(TestAsyncio):

def submit(self):
span = self.tracer.scope_manager.active.span

async def task1():
span.set_tag('key1', '1')

async def task2():
span.set_tag('key2', '2')

async def task3():
span.set_tag('key3', '3')
span.finish()

self.loop.create_task(task3())

self.loop.create_task(task2())

self.loop.create_task(task1())
9 changes: 7 additions & 2 deletions testbed/test_subtask_span_propagation/test_asyncio.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
from __future__ import absolute_import, print_function

import functools

import asyncio

from opentracing.mocktracer import MockTracer
Expand Down Expand Up @@ -33,3 +31,10 @@ async def child_task(self, message, span):
with self.tracer.scope_manager.activate(span, False):
with self.tracer.start_active_span('child'):
return '%s::response' % message


class TestAutoContextPropagationAsyncio(TestAsyncio):

async def child_task(self, message, span):
with self.tracer.start_active_span('child'):
return '%s::response' % message
16 changes: 11 additions & 5 deletions tests/scope_managers/test_tornado.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,22 @@
# THE SOFTWARE.

from __future__ import absolute_import

import pytest
from unittest import TestCase

from tornado import ioloop

from opentracing.scope_managers.tornado import TornadoScopeManager
from opentracing.scope_managers.tornado import tracer_stack_context
from tornado import ioloop, version_info
try:
from opentracing.scope_managers.tornado import TornadoScopeManager
from opentracing.scope_managers.tornado import tracer_stack_context
except ImportError:
pass
from opentracing.harness.scope_check import ScopeCompatibilityCheckMixin


# We don't need run tests in case Tornado>=6, because it became
# asyncio-based framework and `stack_context` was deprecated.
@pytest.mark.skipif(version_info >= (6, 0, 0, 0),
reason='skip Tornado >= 6')
class TornadoCompabilityCheck(TestCase, ScopeCompatibilityCheckMixin):
def scope_manager(self):
return TornadoScopeManager()
Expand Down

0 comments on commit a3bc006

Please sign in to comment.