From 72862c91a0a12a7420c6bb12a296d9b052ef8686 Mon Sep 17 00:00:00 2001 From: alrex Date: Thu, 13 Feb 2020 15:18:39 -0800 Subject: [PATCH] Adding Context API (#395) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This change implements the Context API portion of OTEP #66. The CorrelationContext API and Propagation API changes will come in future PRs. We're leveraging entrypoints to support other implementations of the Context API if/when necessary. For backwards compatibility, this change uses aiocontextvars for Python versions older than 3.7. Co-authored-by: Diego Hurtado Co-authored-by: Mauricio Vásquez --- ....rst => opentelemetry.context.context.rst} | 2 +- docs/opentelemetry.context.rst | 2 +- .../ext/http_requests/__init__.py | 5 +- opentelemetry-api/setup.py | 7 + .../src/opentelemetry/context/__init__.py | 254 ++++++++---------- .../opentelemetry/context/async_context.py | 45 ---- .../src/opentelemetry/context/base_context.py | 130 --------- .../src/opentelemetry/context/context.py | 44 +++ .../opentelemetry/context/default_context.py | 34 +++ .../context/thread_local_context.py | 45 ---- .../distributedcontext/__init__.py | 22 +- .../trace/propagation/__init__.py | 26 ++ .../tests/context/test_context.py | 65 +++++ opentelemetry-sdk/setup.py | 12 +- .../sdk/context/aiocontextvarsfix.py | 86 ++++++ .../sdk/context/contextvars_context.py | 48 ++++ .../sdk/context/threadlocal_context.py | 44 +++ .../sdk/distributedcontext/__init__.py | 27 +- .../src/opentelemetry/sdk/trace/__init__.py | 17 +- .../sdk/trace/export/__init__.py | 36 +-- opentelemetry-sdk/tests/conftest.py | 30 +++ .../tests/context/test_asyncio.py | 154 +++++++++++ .../tests/context/test_context.py | 63 +++++ .../tests/context/test_threads.py | 87 ++++++ tox.ini | 1 + 25 files changed, 879 insertions(+), 407 deletions(-) rename docs/{opentelemetry.context.base_context.rst => opentelemetry.context.context.rst} (73%) delete mode 100644 opentelemetry-api/src/opentelemetry/context/async_context.py delete mode 100644 opentelemetry-api/src/opentelemetry/context/base_context.py create mode 100644 opentelemetry-api/src/opentelemetry/context/context.py create mode 100644 opentelemetry-api/src/opentelemetry/context/default_context.py delete mode 100644 opentelemetry-api/src/opentelemetry/context/thread_local_context.py create mode 100644 opentelemetry-api/src/opentelemetry/trace/propagation/__init__.py create mode 100644 opentelemetry-api/tests/context/test_context.py create mode 100644 opentelemetry-sdk/src/opentelemetry/sdk/context/aiocontextvarsfix.py create mode 100644 opentelemetry-sdk/src/opentelemetry/sdk/context/contextvars_context.py create mode 100644 opentelemetry-sdk/src/opentelemetry/sdk/context/threadlocal_context.py create mode 100644 opentelemetry-sdk/tests/conftest.py create mode 100644 opentelemetry-sdk/tests/context/test_asyncio.py create mode 100644 opentelemetry-sdk/tests/context/test_context.py create mode 100644 opentelemetry-sdk/tests/context/test_threads.py diff --git a/docs/opentelemetry.context.base_context.rst b/docs/opentelemetry.context.context.rst similarity index 73% rename from docs/opentelemetry.context.base_context.rst rename to docs/opentelemetry.context.context.rst index ac28d40008e..331557d2dde 100644 --- a/docs/opentelemetry.context.base_context.rst +++ b/docs/opentelemetry.context.context.rst @@ -1,7 +1,7 @@ opentelemetry.context.base\_context module ========================================== -.. automodule:: opentelemetry.context.base_context +.. automodule:: opentelemetry.context.context :members: :undoc-members: :show-inheritance: diff --git a/docs/opentelemetry.context.rst b/docs/opentelemetry.context.rst index 7bc738a0500..2b25793458c 100644 --- a/docs/opentelemetry.context.rst +++ b/docs/opentelemetry.context.rst @@ -6,7 +6,7 @@ Submodules .. toctree:: - opentelemetry.context.base_context + opentelemetry.context.context Module contents --------------- diff --git a/ext/opentelemetry-ext-http-requests/src/opentelemetry/ext/http_requests/__init__.py b/ext/opentelemetry-ext-http-requests/src/opentelemetry/ext/http_requests/__init__.py index 4f5a18cf9ea..a557e6fc453 100644 --- a/ext/opentelemetry-ext-http-requests/src/opentelemetry/ext/http_requests/__init__.py +++ b/ext/opentelemetry-ext-http-requests/src/opentelemetry/ext/http_requests/__init__.py @@ -22,8 +22,7 @@ from requests.sessions import Session -from opentelemetry import propagators -from opentelemetry.context import Context +from opentelemetry import context, propagators from opentelemetry.ext.http_requests.version import __version__ from opentelemetry.trace import SpanKind @@ -54,7 +53,7 @@ def enable(tracer_source): @functools.wraps(wrapped) def instrumented_request(self, method, url, *args, **kwargs): - if Context.suppress_instrumentation: + if context.get_value("suppress_instrumentation"): return wrapped(self, method, url, *args, **kwargs) # See diff --git a/opentelemetry-api/setup.py b/opentelemetry-api/setup.py index ee8adf26aeb..fad86f171b8 100644 --- a/opentelemetry-api/setup.py +++ b/opentelemetry-api/setup.py @@ -56,4 +56,11 @@ "/tree/master/opentelemetry-api" ), zip_safe=False, + entry_points={ + "opentelemetry_context": [ + "default_context = " + "opentelemetry.context.default_context:" + "DefaultRuntimeContext", + ] + }, ) diff --git a/opentelemetry-api/src/opentelemetry/context/__init__.py b/opentelemetry-api/src/opentelemetry/context/__init__.py index 43a7722f885..63de570abc2 100644 --- a/opentelemetry-api/src/opentelemetry/context/__init__.py +++ b/opentelemetry-api/src/opentelemetry/context/__init__.py @@ -12,141 +12,119 @@ # See the License for the specific language governing permissions and # limitations under the License. - -""" -The OpenTelemetry context module provides abstraction layer on top of -thread-local storage and contextvars. The long term direction is to switch to -contextvars provided by the Python runtime library. - -A global object ``Context`` is provided to access all the context related -functionalities:: - - >>> from opentelemetry.context import Context - >>> Context.foo = 1 - >>> Context.foo = 2 - >>> Context.foo - 2 - -When explicit thread is used, a helper function -``Context.with_current_context`` can be used to carry the context across -threads:: - - from threading import Thread - from opentelemetry.context import Context - - def work(name): - print('Entering worker:', Context) - Context.operation_id = name - print('Exiting worker:', Context) - - if __name__ == '__main__': - print('Main thread:', Context) - Context.operation_id = 'main' - - print('Main thread:', Context) - - # by default context is not propagated to worker thread - thread = Thread(target=work, args=('foo',)) - thread.start() - thread.join() - - print('Main thread:', Context) - - # user can propagate context explicitly - thread = Thread( - target=Context.with_current_context(work), - args=('bar',), - ) - thread.start() - thread.join() - - print('Main thread:', Context) - -Here goes another example using thread pool:: - - import time - import threading - - from multiprocessing.dummy import Pool as ThreadPool - from opentelemetry.context import Context - - _console_lock = threading.Lock() - - def println(msg): - with _console_lock: - print(msg) - - def work(name): - println('Entering worker[{}]: {}'.format(name, Context)) - Context.operation_id = name - time.sleep(0.01) - println('Exiting worker[{}]: {}'.format(name, Context)) - - if __name__ == "__main__": - println('Main thread: {}'.format(Context)) - Context.operation_id = 'main' - pool = ThreadPool(2) # create a thread pool with 2 threads - pool.map(Context.with_current_context(work), [ - 'bear', - 'cat', - 'dog', - 'horse', - 'rabbit', - ]) - pool.close() - pool.join() - println('Main thread: {}'.format(Context)) - -Here goes a simple demo of how async could work in Python 3.7+:: - - import asyncio - - from opentelemetry.context import Context - - class Span(object): - def __init__(self, name): - self.name = name - self.parent = Context.current_span - - def __repr__(self): - return ('{}(name={}, parent={})' - .format( - type(self).__name__, - self.name, - self.parent, - )) - - async def __aenter__(self): - Context.current_span = self - - async def __aexit__(self, exc_type, exc, tb): - Context.current_span = self.parent - - async def main(): - print(Context) - async with Span('foo'): - print(Context) - await asyncio.sleep(0.1) - async with Span('bar'): - print(Context) - await asyncio.sleep(0.1) - print(Context) - await asyncio.sleep(0.1) - print(Context) - - if __name__ == '__main__': - asyncio.run(main()) -""" - -from .base_context import BaseRuntimeContext - -__all__ = ["Context"] - -try: - from .async_context import AsyncRuntimeContext - - Context = AsyncRuntimeContext() # type: BaseRuntimeContext -except ImportError: - from .thread_local_context import ThreadLocalRuntimeContext - - Context = ThreadLocalRuntimeContext() +import logging +import typing +from os import environ + +from pkg_resources import iter_entry_points + +from opentelemetry.context.context import Context, RuntimeContext + +logger = logging.getLogger(__name__) +_RUNTIME_CONTEXT = None # type: typing.Optional[RuntimeContext] + + +def get_value(key: str, context: typing.Optional[Context] = None) -> "object": + """To access the local state of a concern, the RuntimeContext API + provides a function which takes a context and a key as input, + and returns a value. + + Args: + key: The key of the value to retrieve. + context: The context from which to retrieve the value, if None, the current context is used. + """ + return context.get(key) if context is not None else get_current().get(key) + + +def set_value( + key: str, value: "object", context: typing.Optional[Context] = None +) -> Context: + """To record the local state of a cross-cutting concern, the + RuntimeContext API provides a function which takes a context, a + key, and a value as input, and returns an updated context + which contains the new value. + + Args: + key: The key of the entry to set + value: The value of the entry to set + context: The context to copy, if None, the current context is used + """ + if context is None: + context = get_current() + new_values = context.copy() + new_values[key] = value + return Context(new_values) + + +def remove_value( + key: str, context: typing.Optional[Context] = None +) -> Context: + """To remove a value, this method returns a new context with the key + cleared. Note that the removed value still remains present in the old + context. + + Args: + key: The key of the entry to remove + context: The context to copy, if None, the current context is used + """ + if context is None: + context = get_current() + new_values = context.copy() + new_values.pop(key, None) + return Context(new_values) + + +def get_current() -> Context: + """To access the context associated with program execution, + the RuntimeContext API provides a function which takes no arguments + and returns a RuntimeContext. + """ + + global _RUNTIME_CONTEXT # pylint: disable=global-statement + if _RUNTIME_CONTEXT is None: + # FIXME use a better implementation of a configuration manager to avoid having + # to get configuration values straight from environment variables + + configured_context = environ.get( + "OPENTELEMETRY_CONTEXT", "default_context" + ) # type: str + try: + _RUNTIME_CONTEXT = next( + iter_entry_points("opentelemetry_context", configured_context) + ).load()() + except Exception: # pylint: disable=broad-except + logger.error("Failed to load context: %s", configured_context) + + return _RUNTIME_CONTEXT.get_current() # type:ignore + + +def set_current(context: Context) -> Context: + """To associate a context with program execution, the Context + API provides a function which takes a Context. + + Args: + context: The context to use as current. + """ + old_context = get_current() + _RUNTIME_CONTEXT.set_current(context) # type:ignore + return old_context + + +def with_current_context( + func: typing.Callable[..., "object"] +) -> typing.Callable[..., "object"]: + """Capture the current context and apply it to the provided func.""" + + caller_context = get_current() + + def call_with_current_context( + *args: "object", **kwargs: "object" + ) -> "object": + try: + backup = get_current() + set_current(caller_context) + return func(*args, **kwargs) + finally: + set_current(backup) + + return call_with_current_context diff --git a/opentelemetry-api/src/opentelemetry/context/async_context.py b/opentelemetry-api/src/opentelemetry/context/async_context.py deleted file mode 100644 index 267059fb31a..00000000000 --- a/opentelemetry-api/src/opentelemetry/context/async_context.py +++ /dev/null @@ -1,45 +0,0 @@ -# Copyright 2019, OpenTelemetry Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -try: - from contextvars import ContextVar -except ImportError: - pass -else: - import typing # pylint: disable=unused-import - from . import base_context - - class AsyncRuntimeContext(base_context.BaseRuntimeContext): - class Slot(base_context.BaseRuntimeContext.Slot): - def __init__(self, name: str, default: object): - # pylint: disable=super-init-not-called - self.name = name - self.contextvar = ContextVar(name) # type: ContextVar[object] - self.default = base_context.wrap_callable( - default - ) # type: typing.Callable[..., object] - - def clear(self) -> None: - self.contextvar.set(self.default()) - - def get(self) -> object: - try: - return self.contextvar.get() - except LookupError: - value = self.default() - self.set(value) - return value - - def set(self, value: object) -> None: - self.contextvar.set(value) diff --git a/opentelemetry-api/src/opentelemetry/context/base_context.py b/opentelemetry-api/src/opentelemetry/context/base_context.py deleted file mode 100644 index 99d6869dd52..00000000000 --- a/opentelemetry-api/src/opentelemetry/context/base_context.py +++ /dev/null @@ -1,130 +0,0 @@ -# Copyright 2019, OpenTelemetry Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import threading -import typing -from contextlib import contextmanager - - -def wrap_callable(target: "object") -> typing.Callable[[], object]: - if callable(target): - return target - return lambda: target - - -class BaseRuntimeContext: - class Slot: - def __init__(self, name: str, default: "object"): - raise NotImplementedError - - def clear(self) -> None: - raise NotImplementedError - - def get(self) -> "object": - raise NotImplementedError - - def set(self, value: "object") -> None: - raise NotImplementedError - - _lock = threading.Lock() - _slots = {} # type: typing.Dict[str, 'BaseRuntimeContext.Slot'] - - @classmethod - def clear(cls) -> None: - """Clear all slots to their default value.""" - keys = cls._slots.keys() - for name in keys: - slot = cls._slots[name] - slot.clear() - - @classmethod - def register_slot( - cls, name: str, default: "object" = None - ) -> "BaseRuntimeContext.Slot": - """Register a context slot with an optional default value. - - :type name: str - :param name: The name of the context slot. - - :type default: object - :param name: The default value of the slot, can be a value or lambda. - - :returns: The registered slot. - """ - with cls._lock: - if name not in cls._slots: - cls._slots[name] = cls.Slot(name, default) - return cls._slots[name] - - def apply(self, snapshot: typing.Dict[str, "object"]) -> None: - """Set the current context from a given snapshot dictionary""" - - for name in snapshot: - setattr(self, name, snapshot[name]) - - def snapshot(self) -> typing.Dict[str, "object"]: - """Return a dictionary of current slots by reference.""" - - keys = self._slots.keys() - return dict((n, self._slots[n].get()) for n in keys) - - def __repr__(self) -> str: - return "{}({})".format(type(self).__name__, self.snapshot()) - - def __getattr__(self, name: str) -> "object": - if name not in self._slots: - self.register_slot(name, None) - slot = self._slots[name] - return slot.get() - - def __setattr__(self, name: str, value: "object") -> None: - if name not in self._slots: - self.register_slot(name, None) - slot = self._slots[name] - slot.set(value) - - def __getitem__(self, name: str) -> "object": - return self.__getattr__(name) - - def __setitem__(self, name: str, value: "object") -> None: - self.__setattr__(name, value) - - @contextmanager # type: ignore - def use(self, **kwargs: typing.Dict[str, object]) -> typing.Iterator[None]: - snapshot = {key: self[key] for key in kwargs} - for key in kwargs: - self[key] = kwargs[key] - yield - for key in kwargs: - self[key] = snapshot[key] - - def with_current_context( - self, func: typing.Callable[..., "object"] - ) -> typing.Callable[..., "object"]: - """Capture the current context and apply it to the provided func. - """ - - caller_context = self.snapshot() - - def call_with_current_context( - *args: "object", **kwargs: "object" - ) -> "object": - try: - backup_context = self.snapshot() - self.apply(caller_context) - return func(*args, **kwargs) - finally: - self.apply(backup_context) - - return call_with_current_context diff --git a/opentelemetry-api/src/opentelemetry/context/context.py b/opentelemetry-api/src/opentelemetry/context/context.py new file mode 100644 index 00000000000..148312a884c --- /dev/null +++ b/opentelemetry-api/src/opentelemetry/context/context.py @@ -0,0 +1,44 @@ +# Copyright 2020, OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import typing +from abc import ABC, abstractmethod + + +class Context(typing.Dict[str, object]): + def __setitem__(self, key: str, value: object) -> None: + raise ValueError + + +class RuntimeContext(ABC): + """The RuntimeContext interface provides a wrapper for the different + mechanisms that are used to propagate context in Python. + Implementations can be made available via entry_points and + selected through environment variables. + """ + + @abstractmethod + def set_current(self, context: Context) -> None: + """ Sets the current `Context` object. + + Args: + context: The Context to set. + """ + + @abstractmethod + def get_current(self) -> Context: + """ Returns the current `Context` object. """ + + +__all__ = ["Context", "RuntimeContext"] diff --git a/opentelemetry-api/src/opentelemetry/context/default_context.py b/opentelemetry-api/src/opentelemetry/context/default_context.py new file mode 100644 index 00000000000..6c83f839d30 --- /dev/null +++ b/opentelemetry-api/src/opentelemetry/context/default_context.py @@ -0,0 +1,34 @@ +# Copyright 2020, OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from opentelemetry.context.context import Context, RuntimeContext + + +class DefaultRuntimeContext(RuntimeContext): + """A default implementation of the RuntimeContext interface using + a dictionary to store values. + """ + + def __init__(self) -> None: + self._current_context = Context() + + def set_current(self, context: Context) -> None: + """See `opentelemetry.context.RuntimeContext.set_current`.""" + self._current_context = context + + def get_current(self) -> Context: + """See `opentelemetry.context.RuntimeContext.get_current`.""" + return self._current_context + + +__all__ = ["DefaultRuntimeContext"] diff --git a/opentelemetry-api/src/opentelemetry/context/thread_local_context.py b/opentelemetry-api/src/opentelemetry/context/thread_local_context.py deleted file mode 100644 index b60914f846c..00000000000 --- a/opentelemetry-api/src/opentelemetry/context/thread_local_context.py +++ /dev/null @@ -1,45 +0,0 @@ -# Copyright 2019, OpenTelemetry Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import threading -import typing # pylint: disable=unused-import - -from . import base_context - - -class ThreadLocalRuntimeContext(base_context.BaseRuntimeContext): - class Slot(base_context.BaseRuntimeContext.Slot): - _thread_local = threading.local() - - def __init__(self, name: str, default: "object"): - # pylint: disable=super-init-not-called - self.name = name - self.default = base_context.wrap_callable( - default - ) # type: typing.Callable[..., object] - - def clear(self) -> None: - setattr(self._thread_local, self.name, self.default()) - - def get(self) -> "object": - try: - got = getattr(self._thread_local, self.name) # type: object - return got - except AttributeError: - value = self.default() - self.set(value) - return value - - def set(self, value: "object") -> None: - setattr(self._thread_local, self.name, value) diff --git a/opentelemetry-api/src/opentelemetry/distributedcontext/__init__.py b/opentelemetry-api/src/opentelemetry/distributedcontext/__init__.py index 38ef3739b90..a89d9825502 100644 --- a/opentelemetry-api/src/opentelemetry/distributedcontext/__init__.py +++ b/opentelemetry-api/src/opentelemetry/distributedcontext/__init__.py @@ -17,6 +17,9 @@ import typing from contextlib import contextmanager +from opentelemetry.context import get_value, set_current, set_value +from opentelemetry.context.context import Context + PRINTABLE = frozenset( itertools.chain( string.ascii_letters, string.digits, string.punctuation, " " @@ -100,7 +103,9 @@ def get_entry_value(self, key: EntryKey) -> typing.Optional[EntryValue]: class DistributedContextManager: - def get_current_context(self) -> typing.Optional[DistributedContext]: + def get_current_context( + self, context: typing.Optional[Context] = None + ) -> typing.Optional[DistributedContext]: """Gets the current DistributedContext. Returns: @@ -123,3 +128,18 @@ def use_context( """ # pylint: disable=no-self-use yield context + + +_DISTRIBUTED_CONTEXT_KEY = "DistributedContext" + + +def distributed_context_from_context( + context: typing.Optional[Context] = None, +) -> DistributedContext: + return get_value(_DISTRIBUTED_CONTEXT_KEY, context) # type: ignore + + +def with_distributed_context( + dctx: DistributedContext, context: typing.Optional[Context] = None +) -> None: + set_current(set_value(_DISTRIBUTED_CONTEXT_KEY, dctx, context=context)) diff --git a/opentelemetry-api/src/opentelemetry/trace/propagation/__init__.py b/opentelemetry-api/src/opentelemetry/trace/propagation/__init__.py new file mode 100644 index 00000000000..67d8a76a53e --- /dev/null +++ b/opentelemetry-api/src/opentelemetry/trace/propagation/__init__.py @@ -0,0 +1,26 @@ +# Copyright 2020, OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import Optional + +from opentelemetry.trace import INVALID_SPAN_CONTEXT, Span, SpanContext + +_SPAN_CONTEXT_KEY = "extracted-span-context" +_SPAN_KEY = "current-span" + + +def get_span_key(tracer_source_id: Optional[str] = None) -> str: + key = _SPAN_KEY + if tracer_source_id is not None: + key = "{}-{}".format(key, tracer_source_id) + return key diff --git a/opentelemetry-api/tests/context/test_context.py b/opentelemetry-api/tests/context/test_context.py new file mode 100644 index 00000000000..2536e5149be --- /dev/null +++ b/opentelemetry-api/tests/context/test_context.py @@ -0,0 +1,65 @@ +# Copyright 2020, OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +from opentelemetry import context +from opentelemetry.context.context import Context + + +def do_work() -> None: + context.set_current(context.set_value("say", "bar")) + + +class TestContext(unittest.TestCase): + def setUp(self): + context.set_current(Context()) + + def test_context(self): + self.assertIsNone(context.get_value("say")) + empty = context.get_current() + second = context.set_value("say", "foo") + self.assertEqual(context.get_value("say", context=second), "foo") + + do_work() + self.assertEqual(context.get_value("say"), "bar") + third = context.get_current() + + self.assertIsNone(context.get_value("say", context=empty)) + self.assertEqual(context.get_value("say", context=second), "foo") + self.assertEqual(context.get_value("say", context=third), "bar") + + def test_set_value(self): + first = context.set_value("a", "yyy") + second = context.set_value("a", "zzz") + third = context.set_value("a", "---", first) + self.assertEqual("yyy", context.get_value("a", context=first)) + self.assertEqual("zzz", context.get_value("a", context=second)) + self.assertEqual("---", context.get_value("a", context=third)) + self.assertEqual(None, context.get_value("a")) + + def test_context_is_immutable(self): + with self.assertRaises(ValueError): + # ensure a context + context.get_current()["test"] = "cant-change-immutable" + + def test_set_current(self): + context.set_current(context.set_value("a", "yyy")) + + old_context = context.set_current(context.set_value("a", "zzz")) + self.assertEqual("yyy", context.get_value("a", context=old_context)) + self.assertEqual("zzz", context.get_value("a")) + + context.set_current(old_context) + self.assertEqual("yyy", context.get_value("a")) diff --git a/opentelemetry-sdk/setup.py b/opentelemetry-sdk/setup.py index cbfb0f075d4..7e88bb3bfe5 100644 --- a/opentelemetry-sdk/setup.py +++ b/opentelemetry-sdk/setup.py @@ -44,7 +44,7 @@ include_package_data=True, long_description=open("README.rst").read(), long_description_content_type="text/x-rst", - install_requires=["opentelemetry-api==0.4.dev0"], + install_requires=["opentelemetry-api==0.4.dev0", "aiocontextvars"], extras_require={}, license="Apache-2.0", package_dir={"": "src"}, @@ -56,4 +56,14 @@ "/tree/master/opentelemetry-sdk" ), zip_safe=False, + entry_points={ + "opentelemetry_context": [ + "contextvars_context = " + "opentelemetry.sdk.context.contextvars_context:" + "ContextVarsRuntimeContext", + "threadlocal_context = " + "opentelemetry.sdk.context.threadlocal_context:" + "ThreadLocalRuntimeContext", + ] + }, ) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/context/aiocontextvarsfix.py b/opentelemetry-sdk/src/opentelemetry/sdk/context/aiocontextvarsfix.py new file mode 100644 index 00000000000..6aa17793788 --- /dev/null +++ b/opentelemetry-sdk/src/opentelemetry/sdk/context/aiocontextvarsfix.py @@ -0,0 +1,86 @@ +# type: ignore +# Copyright 2020, OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# This module is a patch to allow aiocontextvars to work for older versions +# of Python 3.5. It is copied and pasted from: +# https://github.com/fantix/aiocontextvars/issues/88#issuecomment-522276290 + +import asyncio +import asyncio.coroutines +import asyncio.futures +import concurrent.futures + +if not hasattr(asyncio, "_get_running_loop"): + # noinspection PyCompatibility + # pylint:disable=protected-access + import asyncio.events + from threading import local as threading_local + + if not hasattr(asyncio.events, "_get_running_loop"): + + class _RunningLoop(threading_local): + _loop = None + + _running_loop = _RunningLoop() + + def _get_running_loop(): + return _running_loop._loop + + def set_running_loop(loop): # noqa: F811 + _running_loop._loop = loop + + def _get_event_loop(): + current_loop = _get_running_loop() + if current_loop is not None: + return current_loop + return asyncio.events.get_event_loop_policy().get_event_loop() + + asyncio.events.get_event_loop = _get_event_loop + asyncio.events._get_running_loop = _get_running_loop + asyncio.events._set_running_loop = set_running_loop + + asyncio._get_running_loop = asyncio.events._get_running_loop + asyncio._set_running_loop = asyncio.events._set_running_loop + +# noinspection PyUnresolvedReferences +import aiocontextvars # pylint: disable=unused-import,wrong-import-position # noqa # isort:skip + + +def _run_coroutine_threadsafe(coro, loop): + """ + Patch to create task in the same thread instead of in the callback. + This ensures that contextvars get copied. Python 3.7 copies contextvars + without this. + """ + if not asyncio.coroutines.iscoroutine(coro): + raise TypeError("A coroutine object is required") + future = concurrent.futures.Future() + task = asyncio.ensure_future(coro, loop=loop) + + def callback() -> None: + try: + # noinspection PyProtectedMember,PyUnresolvedReferences + # pylint:disable=protected-access + asyncio.futures._chain_future(task, future) + except Exception as exc: + if future.set_running_or_notify_cancel(): + future.set_exception(exc) + raise + + loop.call_soon_threadsafe(callback) + return future + + +asyncio.run_coroutine_threadsafe = _run_coroutine_threadsafe diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/context/contextvars_context.py b/opentelemetry-sdk/src/opentelemetry/sdk/context/contextvars_context.py new file mode 100644 index 00000000000..0a350e26997 --- /dev/null +++ b/opentelemetry-sdk/src/opentelemetry/sdk/context/contextvars_context.py @@ -0,0 +1,48 @@ +# Copyright 2020, OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from contextvars import ContextVar +from sys import version_info + +from opentelemetry.context import Context +from opentelemetry.context.context import RuntimeContext + +if (3, 5, 3) <= version_info < (3, 7): + import aiocontextvars # type: ignore # pylint:disable=unused-import + +elif (3, 4) < version_info <= (3, 5, 2): + import opentelemetry.sdk.context.aiocontextvarsfix # pylint:disable=unused-import + + +class ContextVarsRuntimeContext(RuntimeContext): + """An implementation of the RuntimeContext interface which wraps ContextVar under + the hood. This is the prefered implementation for usage with Python 3.5+ + """ + + _CONTEXT_KEY = "current_context" + + def __init__(self) -> None: + self._current_context = ContextVar( + self._CONTEXT_KEY, default=Context() + ) + + def set_current(self, context: Context) -> None: + """See `opentelemetry.context.RuntimeContext.set_current`.""" + self._current_context.set(context) + + def get_current(self) -> Context: + """See `opentelemetry.context.RuntimeContext.get_current`.""" + return self._current_context.get() + + +__all__ = ["ContextVarsRuntimeContext"] diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/context/threadlocal_context.py b/opentelemetry-sdk/src/opentelemetry/sdk/context/threadlocal_context.py new file mode 100644 index 00000000000..26d4329c52f --- /dev/null +++ b/opentelemetry-sdk/src/opentelemetry/sdk/context/threadlocal_context.py @@ -0,0 +1,44 @@ +# Copyright 2020, OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import threading + +from opentelemetry.context import Context, RuntimeContext + + +class ThreadLocalRuntimeContext(RuntimeContext): + """An implementation of the RuntimeContext interface + which uses thread-local storage under the hood. This + implementation is available for usage with Python 3.4. + """ + + _CONTEXT_KEY = "current_context" + + def __init__(self) -> None: + self._current_context = threading.local() + + def set_current(self, context: Context) -> None: + """See `opentelemetry.context.RuntimeContext.set_current`.""" + setattr(self._current_context, self._CONTEXT_KEY, context) + + def get_current(self) -> Context: + """See `opentelemetry.context.RuntimeContext.get_current`.""" + if not hasattr(self._current_context, self._CONTEXT_KEY): + setattr( + self._current_context, self._CONTEXT_KEY, Context(), + ) + return getattr(self._current_context, self._CONTEXT_KEY) + + +__all__ = ["ThreadLocalRuntimeContext"] diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/distributedcontext/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/distributedcontext/__init__.py index a20cbf89635..7a0a66a8a9a 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/distributedcontext/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/distributedcontext/__init__.py @@ -16,33 +16,27 @@ from contextlib import contextmanager from opentelemetry import distributedcontext as dctx_api -from opentelemetry.context import Context +from opentelemetry.context import Context, get_value, set_value +from opentelemetry.distributedcontext import ( + distributed_context_from_context, + with_distributed_context, +) class DistributedContextManager(dctx_api.DistributedContextManager): """See `opentelemetry.distributedcontext.DistributedContextManager` - Args: - name: The name of the context manager """ - def __init__(self, name: str = "") -> None: - if name: - slot_name = "DistributedContext.{}".format(name) - else: - slot_name = "DistributedContext" - - self._current_context = Context.register_slot(slot_name) - def get_current_context( - self, + self, context: typing.Optional[Context] = None ) -> typing.Optional[dctx_api.DistributedContext]: """Gets the current DistributedContext. Returns: A DistributedContext instance representing the current context. """ - return self._current_context.get() + return distributed_context_from_context(context=context) @contextmanager def use_context( @@ -58,9 +52,10 @@ def use_context( Args: context: A DistributedContext instance to make current. """ - snapshot = self._current_context.get() - self._current_context.set(context) + snapshot = distributed_context_from_context() + with_distributed_context(context) + try: yield context finally: - self._current_context.set(snapshot) + with_distributed_context(snapshot) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py index 7a1594db63c..6d249e65080 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py @@ -22,11 +22,12 @@ from types import TracebackType from typing import Iterator, Optional, Sequence, Tuple, Type +from opentelemetry import context as context_api from opentelemetry import trace as trace_api -from opentelemetry.context import Context from opentelemetry.sdk import util from opentelemetry.sdk.util import BoundedDict, BoundedList from opentelemetry.trace import SpanContext, sampling +from opentelemetry.trace.propagation import get_span_key from opentelemetry.trace.status import Status, StatusCanonicalCode from opentelemetry.util import time_ns, types @@ -540,16 +541,14 @@ def use_span( ) -> Iterator[trace_api.Span]: """See `opentelemetry.trace.Tracer.use_span`.""" try: - span_snapshot = self.source.get_current_span() - self.source._current_span_slot.set( # pylint:disable=protected-access - span + context_snapshot = context_api.get_current() + context_api.set_current( + context_api.set_value(self.source.key, span) ) try: yield span finally: - self.source._current_span_slot.set( # pylint:disable=protected-access - span_snapshot - ) + context_api.set_current(context_snapshot) except Exception as error: # pylint: disable=broad-except if ( @@ -580,7 +579,7 @@ def __init__( ): # TODO: How should multiple TracerSources behave? Should they get their own contexts? # This could be done by adding `str(id(self))` to the slot name. - self._current_span_slot = Context.register_slot("current_span") + self.key = get_span_key(tracer_source_id=str(id(self))) self._active_span_processor = MultiSpanProcessor() self.sampler = sampler self._atexit_handler = None @@ -603,7 +602,7 @@ def get_tracer( ) def get_current_span(self) -> Span: - return self._current_span_slot.get() + return context_api.get_value(self.key) # type: ignore def add_span_processor(self, span_processor: SpanProcessor) -> None: """Registers a new :class:`SpanProcessor` for this `TracerSource`. diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py index 87f4a097d50..0a1b1c8041d 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py @@ -19,7 +19,7 @@ import typing from enum import Enum -from opentelemetry.context import Context +from opentelemetry.context import get_current, set_current, set_value from opentelemetry.trace import DefaultSpan from opentelemetry.util import time_ns @@ -75,12 +75,14 @@ def on_start(self, span: Span) -> None: pass def on_end(self, span: Span) -> None: - with Context.use(suppress_instrumentation=True): - try: - self.span_exporter.export((span,)) - # pylint: disable=broad-except - except Exception: - logger.exception("Exception while exporting Span.") + backup_context = get_current() + set_current(set_value("suppress_instrumentation", True)) + try: + self.span_exporter.export((span,)) + # pylint: disable=broad-except + except Exception: + logger.exception("Exception while exporting Span.") + set_current(backup_context) def shutdown(self) -> None: self.span_exporter.shutdown() @@ -200,16 +202,16 @@ def export(self) -> None: else: self.spans_list[idx] = span idx += 1 - with Context.use(suppress_instrumentation=True): - try: - # Ignore type b/c the Optional[None]+slicing is too "clever" - # for mypy - self.span_exporter.export( - self.spans_list[:idx] - ) # type: ignore - # pylint: disable=broad-except - except Exception: - logger.exception("Exception while exporting Span batch.") + backup_context = get_current() + set_current(set_value("suppress_instrumentation", True)) + try: + # Ignore type b/c the Optional[None]+slicing is too "clever" + # for mypy + self.span_exporter.export(self.spans_list[:idx]) # type: ignore + # pylint: disable=broad-except + except Exception: + logger.exception("Exception while exporting Span batch.") + set_current(backup_context) if notify_flush: with self.flush_condition: diff --git a/opentelemetry-sdk/tests/conftest.py b/opentelemetry-sdk/tests/conftest.py new file mode 100644 index 00000000000..59e306f1303 --- /dev/null +++ b/opentelemetry-sdk/tests/conftest.py @@ -0,0 +1,30 @@ +# Copyright 2020, OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from os import environ +from sys import version_info + + +def pytest_sessionstart(session): + # pylint: disable=unused-argument + if version_info < (3, 5): + # contextvars are not supported in 3.4, use thread-local storage + environ["OPENTELEMETRY_CONTEXT"] = "threadlocal_context" + else: + environ["OPENTELEMETRY_CONTEXT"] = "contextvars_context" + + +def pytest_sessionfinish(session): + # pylint: disable=unused-argument + environ.pop("OPENTELEMETRY_CONTEXT") diff --git a/opentelemetry-sdk/tests/context/test_asyncio.py b/opentelemetry-sdk/tests/context/test_asyncio.py new file mode 100644 index 00000000000..5dc3637598e --- /dev/null +++ b/opentelemetry-sdk/tests/context/test_asyncio.py @@ -0,0 +1,154 @@ +# Copyright 2020, OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import unittest +from unittest.mock import patch + +from opentelemetry import context +from opentelemetry.sdk import trace +from opentelemetry.sdk.trace import export +from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, +) + +try: + import contextvars # pylint: disable=unused-import + from opentelemetry.sdk.context.contextvars_context import ( + ContextVarsRuntimeContext, + ) +except ImportError: + raise unittest.SkipTest("contextvars not available") + + +_SPAN_NAMES = [ + "test_span1", + "test_span2", + "test_span3", + "test_span4", + "test_span5", +] + + +def stop_loop_when(loop, cond_func, timeout=5.0): + """Registers a periodic callback that stops the loop when cond_func() == True. + Compatible with both Tornado and asyncio. + """ + if cond_func() or timeout <= 0.0: + loop.stop() + return + + timeout -= 0.1 + loop.call_later(0.1, stop_loop_when, loop, cond_func, timeout) + + +def do_work() -> None: + context.set_current(context.set_value("say", "bar")) + + +class TestAsyncio(unittest.TestCase): + @asyncio.coroutine + def task(self, name): + with self.tracer.start_as_current_span(name): + context.set_value("say", "bar") + + def submit_another_task(self, name): + self.loop.create_task(self.task(name)) + + def setUp(self): + self.previous_context = context.get_current() + context.set_current(context.Context()) + self.tracer_source = trace.TracerSource() + self.tracer = self.tracer_source.get_tracer(__name__) + self.memory_exporter = InMemorySpanExporter() + span_processor = export.SimpleExportSpanProcessor(self.memory_exporter) + self.tracer_source.add_span_processor(span_processor) + self.loop = asyncio.get_event_loop() + + def tearDown(self): + context.set_current(self.previous_context) + + @patch( + "opentelemetry.context._RUNTIME_CONTEXT", ContextVarsRuntimeContext() + ) + def test_with_asyncio(self): + with self.tracer.start_as_current_span("asyncio_test"): + for name in _SPAN_NAMES: + self.submit_another_task(name) + + stop_loop_when( + self.loop, + lambda: len(self.memory_exporter.get_finished_spans()) >= 5, + timeout=5.0, + ) + self.loop.run_forever() + span_list = self.memory_exporter.get_finished_spans() + span_names_list = [span.name for span in span_list] + expected = [ + "test_span1", + "test_span2", + "test_span3", + "test_span4", + "test_span5", + "asyncio_test", + ] + self.assertCountEqual(span_names_list, expected) + span_names_list.sort() + expected.sort() + self.assertListEqual(span_names_list, expected) + expected_parent = next( + span for span in span_list if span.name == "asyncio_test" + ) + for span in span_list: + if span is expected_parent: + continue + self.assertEqual(span.parent, expected_parent) + + +class TestContextVarsContext(unittest.TestCase): + def setUp(self): + self.previous_context = context.get_current() + + def tearDown(self): + context.set_current(self.previous_context) + + @patch( + "opentelemetry.context._RUNTIME_CONTEXT", ContextVarsRuntimeContext() + ) + def test_context(self): + self.assertIsNone(context.get_value("say")) + empty = context.get_current() + second = context.set_value("say", "foo") + + self.assertEqual(context.get_value("say", context=second), "foo") + + do_work() + self.assertEqual(context.get_value("say"), "bar") + third = context.get_current() + + self.assertIsNone(context.get_value("say", context=empty)) + self.assertEqual(context.get_value("say", context=second), "foo") + self.assertEqual(context.get_value("say", context=third), "bar") + + @patch( + "opentelemetry.context._RUNTIME_CONTEXT", ContextVarsRuntimeContext() + ) + def test_set_value(self): + first = context.set_value("a", "yyy") + second = context.set_value("a", "zzz") + third = context.set_value("a", "---", first) + self.assertEqual("yyy", context.get_value("a", context=first)) + self.assertEqual("zzz", context.get_value("a", context=second)) + self.assertEqual("---", context.get_value("a", context=third)) + self.assertEqual(None, context.get_value("a")) diff --git a/opentelemetry-sdk/tests/context/test_context.py b/opentelemetry-sdk/tests/context/test_context.py new file mode 100644 index 00000000000..88a63109d16 --- /dev/null +++ b/opentelemetry-sdk/tests/context/test_context.py @@ -0,0 +1,63 @@ +# Copyright 2020, OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from unittest.mock import patch + +from opentelemetry import context +from opentelemetry.sdk.context.threadlocal_context import ( + ThreadLocalRuntimeContext, +) + + +def do_work() -> None: + context.set_current(context.set_value("say", "bar")) + + +class TestThreadLocalContext(unittest.TestCase): + def setUp(self): + self.previous_context = context.get_current() + + def tearDown(self): + context.set_current(self.previous_context) + + @patch( + "opentelemetry.context._RUNTIME_CONTEXT", ThreadLocalRuntimeContext() + ) + def test_context(self): + self.assertIsNone(context.get_value("say")) + empty = context.get_current() + second = context.set_value("say", "foo") + + self.assertEqual(context.get_value("say", context=second), "foo") + + do_work() + self.assertEqual(context.get_value("say"), "bar") + third = context.get_current() + + self.assertIsNone(context.get_value("say", context=empty)) + self.assertEqual(context.get_value("say", context=second), "foo") + self.assertEqual(context.get_value("say", context=third), "bar") + + @patch( + "opentelemetry.context._RUNTIME_CONTEXT", ThreadLocalRuntimeContext() + ) + def test_set_value(self): + first = context.set_value("a", "yyy") + second = context.set_value("a", "zzz") + third = context.set_value("a", "---", first) + self.assertEqual("yyy", context.get_value("a", context=first)) + self.assertEqual("zzz", context.get_value("a", context=second)) + self.assertEqual("---", context.get_value("a", context=third)) + self.assertEqual(None, context.get_value("a")) diff --git a/opentelemetry-sdk/tests/context/test_threads.py b/opentelemetry-sdk/tests/context/test_threads.py new file mode 100644 index 00000000000..e8552b9135e --- /dev/null +++ b/opentelemetry-sdk/tests/context/test_threads.py @@ -0,0 +1,87 @@ +# Copyright 2020, OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from multiprocessing.dummy import Pool +from unittest.mock import patch + +from opentelemetry import context +from opentelemetry.sdk import trace +from opentelemetry.sdk.context.threadlocal_context import ( + ThreadLocalRuntimeContext, +) +from opentelemetry.sdk.trace import export +from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, +) + + +class TestThreads(unittest.TestCase): + span_names = [ + "test_span1", + "test_span2", + "test_span3", + "test_span4", + "test_span5", + ] + + def do_work(self, name="default"): + with self.tracer.start_as_current_span(name): + context.set_value("say-something", "bar") + + def setUp(self): + self.previous_context = context.get_current() + context.set_current(context.Context()) + self.tracer_source = trace.TracerSource() + self.tracer = self.tracer_source.get_tracer(__name__) + self.memory_exporter = InMemorySpanExporter() + span_processor = export.SimpleExportSpanProcessor(self.memory_exporter) + self.tracer_source.add_span_processor(span_processor) + + def tearDown(self): + context.set_current(self.previous_context) + + @patch( + "opentelemetry.context._RUNTIME_CONTEXT", ThreadLocalRuntimeContext() + ) + def test_with_threads(self): + with self.tracer.start_as_current_span("threads_test"): + pool = Pool(5) # create a thread pool + pool.map( + context.with_current_context(self.do_work), self.span_names + ) + pool.close() + pool.join() + span_list = self.memory_exporter.get_finished_spans() + span_names_list = [span.name for span in span_list] + expected = [ + "test_span1", + "test_span2", + "test_span3", + "test_span4", + "test_span5", + "threads_test", + ] + self.assertCountEqual(span_names_list, expected) + span_names_list.sort() + expected.sort() + self.assertListEqual(span_names_list, expected) + expected_parent = next( + span for span in span_list if span.name == "threads_test" + ) + # FIXME + for span in span_list: + if span is expected_parent: + continue + self.assertEqual(span.parent, expected_parent) diff --git a/tox.ini b/tox.ini index 4195e8629da..51eda59d70c 100644 --- a/tox.ini +++ b/tox.ini @@ -82,6 +82,7 @@ commands_pre = jaeger: pip install {toxinidir}/opentelemetry-sdk jaeger: pip install {toxinidir}/ext/opentelemetry-ext-jaeger opentracing-shim: pip install {toxinidir}/opentelemetry-sdk {toxinidir}/ext/opentelemetry-ext-opentracing-shim + zipkin: pip install {toxinidir}/opentelemetry-sdk zipkin: pip install {toxinidir}/ext/opentelemetry-ext-zipkin ; In order to get a healthy coverage report,