Skip to content

Commit

Permalink
Adding Context API (open-telemetry#395)
Browse files Browse the repository at this point in the history
This change implements the Context API portion of OTEP open-telemetry#66. The
CorrelationContext API and Propagation API changes will come in future PRs.
We're leveraging entrypoints to support other implementations of the Context
API if/when necessary. For backwards compatibility, this change uses
aiocontextvars for Python versions older than 3.7.

Co-authored-by: Diego Hurtado <ocelotl@users.noreply.github.com>
Co-authored-by: Mauricio Vásquez <mauricio@kinvolk.io>
  • Loading branch information
3 people authored Feb 13, 2020
1 parent c50aab8 commit 72862c9
Show file tree
Hide file tree
Showing 25 changed files with 879 additions and 407 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
opentelemetry.context.base\_context module
==========================================

.. automodule:: opentelemetry.context.base_context
.. automodule:: opentelemetry.context.context
:members:
:undoc-members:
:show-inheritance:
2 changes: 1 addition & 1 deletion docs/opentelemetry.context.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Submodules

.. toctree::

opentelemetry.context.base_context
opentelemetry.context.context

Module contents
---------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@

from requests.sessions import Session

from opentelemetry import propagators
from opentelemetry.context import Context
from opentelemetry import context, propagators
from opentelemetry.ext.http_requests.version import __version__
from opentelemetry.trace import SpanKind

Expand Down Expand Up @@ -54,7 +53,7 @@ def enable(tracer_source):

@functools.wraps(wrapped)
def instrumented_request(self, method, url, *args, **kwargs):
if Context.suppress_instrumentation:
if context.get_value("suppress_instrumentation"):
return wrapped(self, method, url, *args, **kwargs)

# See
Expand Down
7 changes: 7 additions & 0 deletions opentelemetry-api/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,11 @@
"/tree/master/opentelemetry-api"
),
zip_safe=False,
entry_points={
"opentelemetry_context": [
"default_context = "
"opentelemetry.context.default_context:"
"DefaultRuntimeContext",
]
},
)
254 changes: 116 additions & 138 deletions opentelemetry-api/src/opentelemetry/context/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,141 +12,119 @@
# See the License for the specific language governing permissions and
# limitations under the License.


"""
The OpenTelemetry context module provides abstraction layer on top of
thread-local storage and contextvars. The long term direction is to switch to
contextvars provided by the Python runtime library.
A global object ``Context`` is provided to access all the context related
functionalities::
>>> from opentelemetry.context import Context
>>> Context.foo = 1
>>> Context.foo = 2
>>> Context.foo
2
When explicit thread is used, a helper function
``Context.with_current_context`` can be used to carry the context across
threads::
from threading import Thread
from opentelemetry.context import Context
def work(name):
print('Entering worker:', Context)
Context.operation_id = name
print('Exiting worker:', Context)
if __name__ == '__main__':
print('Main thread:', Context)
Context.operation_id = 'main'
print('Main thread:', Context)
# by default context is not propagated to worker thread
thread = Thread(target=work, args=('foo',))
thread.start()
thread.join()
print('Main thread:', Context)
# user can propagate context explicitly
thread = Thread(
target=Context.with_current_context(work),
args=('bar',),
)
thread.start()
thread.join()
print('Main thread:', Context)
Here goes another example using thread pool::
import time
import threading
from multiprocessing.dummy import Pool as ThreadPool
from opentelemetry.context import Context
_console_lock = threading.Lock()
def println(msg):
with _console_lock:
print(msg)
def work(name):
println('Entering worker[{}]: {}'.format(name, Context))
Context.operation_id = name
time.sleep(0.01)
println('Exiting worker[{}]: {}'.format(name, Context))
if __name__ == "__main__":
println('Main thread: {}'.format(Context))
Context.operation_id = 'main'
pool = ThreadPool(2) # create a thread pool with 2 threads
pool.map(Context.with_current_context(work), [
'bear',
'cat',
'dog',
'horse',
'rabbit',
])
pool.close()
pool.join()
println('Main thread: {}'.format(Context))
Here goes a simple demo of how async could work in Python 3.7+::
import asyncio
from opentelemetry.context import Context
class Span(object):
def __init__(self, name):
self.name = name
self.parent = Context.current_span
def __repr__(self):
return ('{}(name={}, parent={})'
.format(
type(self).__name__,
self.name,
self.parent,
))
async def __aenter__(self):
Context.current_span = self
async def __aexit__(self, exc_type, exc, tb):
Context.current_span = self.parent
async def main():
print(Context)
async with Span('foo'):
print(Context)
await asyncio.sleep(0.1)
async with Span('bar'):
print(Context)
await asyncio.sleep(0.1)
print(Context)
await asyncio.sleep(0.1)
print(Context)
if __name__ == '__main__':
asyncio.run(main())
"""

from .base_context import BaseRuntimeContext

__all__ = ["Context"]

try:
from .async_context import AsyncRuntimeContext

Context = AsyncRuntimeContext() # type: BaseRuntimeContext
except ImportError:
from .thread_local_context import ThreadLocalRuntimeContext

Context = ThreadLocalRuntimeContext()
import logging
import typing
from os import environ

from pkg_resources import iter_entry_points

from opentelemetry.context.context import Context, RuntimeContext

logger = logging.getLogger(__name__)
_RUNTIME_CONTEXT = None # type: typing.Optional[RuntimeContext]


def get_value(key: str, context: typing.Optional[Context] = None) -> "object":
"""To access the local state of a concern, the RuntimeContext API
provides a function which takes a context and a key as input,
and returns a value.
Args:
key: The key of the value to retrieve.
context: The context from which to retrieve the value, if None, the current context is used.
"""
return context.get(key) if context is not None else get_current().get(key)


def set_value(
key: str, value: "object", context: typing.Optional[Context] = None
) -> Context:
"""To record the local state of a cross-cutting concern, the
RuntimeContext API provides a function which takes a context, a
key, and a value as input, and returns an updated context
which contains the new value.
Args:
key: The key of the entry to set
value: The value of the entry to set
context: The context to copy, if None, the current context is used
"""
if context is None:
context = get_current()
new_values = context.copy()
new_values[key] = value
return Context(new_values)


def remove_value(
key: str, context: typing.Optional[Context] = None
) -> Context:
"""To remove a value, this method returns a new context with the key
cleared. Note that the removed value still remains present in the old
context.
Args:
key: The key of the entry to remove
context: The context to copy, if None, the current context is used
"""
if context is None:
context = get_current()
new_values = context.copy()
new_values.pop(key, None)
return Context(new_values)


def get_current() -> Context:
"""To access the context associated with program execution,
the RuntimeContext API provides a function which takes no arguments
and returns a RuntimeContext.
"""

global _RUNTIME_CONTEXT # pylint: disable=global-statement
if _RUNTIME_CONTEXT is None:
# FIXME use a better implementation of a configuration manager to avoid having
# to get configuration values straight from environment variables

configured_context = environ.get(
"OPENTELEMETRY_CONTEXT", "default_context"
) # type: str
try:
_RUNTIME_CONTEXT = next(
iter_entry_points("opentelemetry_context", configured_context)
).load()()
except Exception: # pylint: disable=broad-except
logger.error("Failed to load context: %s", configured_context)

return _RUNTIME_CONTEXT.get_current() # type:ignore


def set_current(context: Context) -> Context:
"""To associate a context with program execution, the Context
API provides a function which takes a Context.
Args:
context: The context to use as current.
"""
old_context = get_current()
_RUNTIME_CONTEXT.set_current(context) # type:ignore
return old_context


def with_current_context(
func: typing.Callable[..., "object"]
) -> typing.Callable[..., "object"]:
"""Capture the current context and apply it to the provided func."""

caller_context = get_current()

def call_with_current_context(
*args: "object", **kwargs: "object"
) -> "object":
try:
backup = get_current()
set_current(caller_context)
return func(*args, **kwargs)
finally:
set_current(backup)

return call_with_current_context
45 changes: 0 additions & 45 deletions opentelemetry-api/src/opentelemetry/context/async_context.py

This file was deleted.

Loading

0 comments on commit 72862c9

Please sign in to comment.