Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding a working propagator, adding to integrations and example #137

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

from requests.sessions import Session

import opentelemetry.propagator as propagator
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not from opentelemetry import propagator?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, I'd prefer opentelemetry.propagation, but that's based on personal taste ;)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But then we've got opentelemetry.context.propagation and opentelemetry.propagation, which is why I'd prefer propagators if we keep this package structure.

...but in any case all I mean to complain about here is the from x.y import y as x import pattern. :P

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I can fix that. was not aware of the ability to import modules that way.



# NOTE: Currently we force passing a tracer. But in turn, this forces the user
# to configure a SDK before enabling this integration. In turn, this means that
Expand Down Expand Up @@ -72,6 +74,11 @@ def instrumented_request(self, method, url, *args, **kwargs):
# TODO: Propagate the trace context via headers once we have a way
# to access propagators.

headers = kwargs.get("headers", {})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of get + set, use kwargs.setdefault.

propagator.get_global_propagator().inject(
tracer, type(headers).__setitem__, headers
)
kwargs["headers"] = headers
result = wrapped(self, method, url, *args, **kwargs) # *** PROCEED

span.set_attribute("http.status_code", result.status_code)
Expand Down
25 changes: 19 additions & 6 deletions ext/opentelemetry-ext-wsgi/src/opentelemetry/ext/wsgi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@
"""

import functools
import typing
import wsgiref.util as wsgiref_util

from opentelemetry import trace
from opentelemetry import propagator, trace
from opentelemetry.ext.wsgi.version import __version__ # noqa


Expand All @@ -35,12 +36,9 @@ class OpenTelemetryMiddleware:
wsgi: The WSGI application callable.
"""

def __init__(self, wsgi, propagators=None):
def __init__(self, wsgi):
self.wsgi = wsgi

# TODO: implement context propagation
self.propagators = propagators

@staticmethod
def _add_request_attributes(span, environ):
span.set_attribute("component", "http")
Expand Down Expand Up @@ -87,8 +85,11 @@ def __call__(self, environ, start_response):

tracer = trace.tracer()
path_info = environ["PATH_INFO"] or "/"
parent_span = propagator.get_global_propagator().extract(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd name that parent_context (because it is no span but a context).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is that the right call? the trace/init.py defines the type as "ParentSpan"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd argue that either both should be called ParentContext, or ParentSpan.

get_header_from_environ, environ
)

with tracer.start_span(path_info) as span:
with tracer.start_span(path_info, parent_span) as span:
self._add_request_attributes(span, environ)
start_response = self._create_start_response(span, start_response)

Expand All @@ -99,3 +100,15 @@ def __call__(self, environ, start_response):
finally:
if hasattr(iterable, "close"):
iterable.close()


def get_header_from_environ(
environ: dict, header_name: str
) -> typing.Optional[str]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The type is wrong. If we want to use type annotations for extensions, we must also run mypy on them (I wouldn't be against that, btw, as it is also useful without explicit type annotations). Also this returns [None] if the header is not found, which looks suspicious (it now returns typing.List[typing.Optional[str]], but probably it should do either typing.Optional[typing.List[str]] or even just typing.List[str], returning an empty list if the header wasn't found).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, good point. This has been an outstanding issue since the initial b3 PR. I'll fix that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually I realized the b3 code is fine. but will fix this the wsgi spec.

"""Retrieve the header value from the wsgi environ dictionary.

Returns:
A string with the header value if it exists, else None.
"""
environ_key = "HTTP_" + header_name.upper().replace("-", "_")
return [environ.get(environ_key)]
4 changes: 3 additions & 1 deletion ext/opentelemetry-ext-wsgi/tests/test_wsgi_middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,9 @@ def validate_response(self, response, error=None):
self.assertIsNone(self.exc_info)

# Verify that start_span has been called
self.start_span.assert_called_once_with("/")
self.start_span.assert_called_once_with(
"/", trace_api.INVALID_SPAN_CONTEXT
)

def test_basic_wsgi_call(self):
app = OpenTelemetryMiddleware(simple_wsgi)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import opentelemetry.trace as trace
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heads up that these new files need the license boilerplate.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, will add. side question: why do we need this in every file? is that a better choice to make from a legal perspective?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICT we're just following Apache's own guidance here. There may be a legal reason to do this, but that question is above my paygrade.

from opentelemetry.context.propagation import httptextformat


class TraceStateHTTPTextFormat(httptextformat.HTTPTextFormat):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I figured since we want to propagate TraceState by default, we can eliminate an invalid propagator by sticking this here for now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I'd prefer to keep the API doing nothing. But open-telemetry/opentelemetry-specification#208 and open-telemetry/opentelemetry-specification#183 are still open, so I'm OK with this for now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't the TraceState be extracted/injected along with the rest of the SpanContext? Why separate it here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After reading the rest of the PR, I see that this might just be a naming issue. Would you expect SpanContextHTTPTextFormat to describe the same thing?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the proper name would be W3CTraceContextHTTPTextFormatter (or some abbreviation thereof).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@c24t @toumorokoshi
Just to clarify, the intent behind TraceStateHTTPTextFormat is to actually cover SpanContext right? So it should simply just be renamed?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I understand it, yes.

"""TODO: a propagator that extracts and injects tracestate.
"""

def extract(
self, _get_from_carrier: httptextformat.Getter, _carrier: object
) -> trace.SpanContext:
return trace.INVALID_SPAN_CONTEXT

def inject(
self,
context: trace.SpanContext,
set_in_carrier: httptextformat.Setter,
carrier: object,
) -> None:
pass
77 changes: 77 additions & 0 deletions opentelemetry-api/src/opentelemetry/propagator/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import typing
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably want to call this package propagators, plural like the others. FWIW this was called propagation in OC, but was also under the trace package.


import opentelemetry.context.propagation.httptextformat as httptextformat
import opentelemetry.trace as trace
from opentelemetry.context.propagation.tracestatehttptextformat import (
TraceStateHTTPTextFormat,
)


class Propagator:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be a class? Could't we make these free helper functions and have the selected formatters directly be globals? Or if we want to give API implementations more freedom, we should move the concept of formatters outside the SDK layer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

helper functions would work fine here... there's nothing that an SDK has to override really, and this was more important when Context was an object that was passed into the constructor.

could do work to set httpformatters as global. I think ultimately the shape of globals will change anyway, don't particularly mind how configuration of httptextformatters look for this specific PR.

"""Class which encapsulates propagation of values to and from context.

In contrast to using the formatters directly, a propagator object can
help own configuration around which formatters to use, as well as
help simplify the work require for integrations to use the intended
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
help simplify the work require for integrations to use the intended
help simplify the work required for integrations to use the intended

formatters.
"""

def __init__(self, httptextformat_instance: httptextformat.HTTPTextFormat):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC one of the open questions about propagation is whether we should have separate interfaces for text and binary formats. This PR seems to use separate interfaces, and default to HTTP/text. Is that intentional or just a consequence of the fact that the binary format hasn't been written yet?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, as a nit: httptextformat_instance is a mouthful. What about just format here? Or supplier, assuming that's the right use.

self._httptextformat = httptextformat_instance

def extract(
self, get_from_carrier: httptextformat.Getter, carrier: object
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Oberon00 what are your thoughts on making the Getter/Setter types generic here to get rid of this carrier object?

https://github.com/toumorokoshi/opentelemetry-python/pull/1/files

This is cribbed from the loader, there may be a better way to do this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, thanks for the PR!

) -> typing.Union[trace.SpanContext, trace.Span, None]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might have covered it in another PR, but what's the use case for returning a Span here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is what the ParentSpan type is a union of. But there's probably no good reason, it could just be SpanContext.

"""Load the parent SpanContext from values in the carrier.

Using the specified HTTPTextFormatter, the propagator will
extract a SpanContext from the carrier. If one is found,
it will be set as the parent context of the current span.

Args:
get_from_carrier: a function that can retrieve zero
or more values from the carrier. In the case that
the value does not exist, return an empty list.
carrier: and object which contains values that are
used to construct a SpanContext. This object
must be paired with an appropriate get_from_carrier
which understands how to extract a value from it.
"""
span_context = self._httptextformat.extract(get_from_carrier, carrier)
return span_context if span_context else trace.Tracer.CURRENT_SPAN
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why return the CURRENT_SPAN if extract failed? In effect, the return value can only be used as argument for Tracer.start_span. In that case, you might use the trace.ParentSpan type alias.

But I think there is an argument to be made that if there is unexpectedly no incoming span context, one would usually prefer to start a new trace instead of continuing the existing one, which would be implemented by just return span_context.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, you're completely right. I think I misunderstood and thought that CURRENT_SPAN was a sentinel value that was required to make sure no exceptions were raised.

I'll fix this to just be Optional[ParentSpan]

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't the current span be null anyway? Since we pick up the trace ID from the incoming request, any span that we've created prior won't belong to the same trace.

I don't think the propagator should just return null here though. In OC we create a new SpanContext with new (valid) IDs when we can't extract the span context. I don't know that we want to do the same thing here, and may want to use e.g. the invalid span instead.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, an invalid span context would also even better, then the return type would be simply SpanContext.

@toumorokoshi: The ParentSpan type already includes None, so putting it in an Optional is redundant.


def inject(
self,
tracer: trace.Tracer,
set_in_carrier: httptextformat.Setter,
carrier: object,
) -> None:
"""Inject values from the current context into the carrier.

inject enables the propagation of values into HTTP clients or
other objects which perform an HTTP request. Implementations
should use the set_in_carrier method to set values on the
carrier.

Args:
set_in_carrier: A setter function that can set values
on the carrier.
carrier: An object that a place to define HTTP headers.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
carrier: An object that a place to define HTTP headers.
carrier: An object defines HTTP headers.

Or something similar, assuming this is a typo.

Should be paired with set_in_carrier, which should
know how to set header values on the carrier.
"""
self._httptextformat.inject(
tracer.get_current_span().get_context(), set_in_carrier, carrier
)


_PROPAGATOR = Propagator(TraceStateHTTPTextFormat())


def get_global_propagator() -> Propagator:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know the getter / setter were supposed to be attached to the tracer. This would have created a cyclic dependency where Tracer (trace module) -> Propagator (propagator module) -> (trace module).

I figure that there's a few RFCs floating around which are trying to tear propagators away from tracers anyway, so a separate global is as good as anywhere.

return _PROPAGATOR


def set_global_propagator(propagator: Propagator) -> None:
global _PROPAGATOR # pylint:disable=global-statement
_PROPAGATOR = propagator
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
This module serves as an example to integrate with flask, using
the requests library to perform downstream requests
"""
import time

import flask
import requests

import opentelemetry.ext.http_requests
from opentelemetry import trace
from opentelemetry import propagator, trace
from opentelemetry.ext.wsgi import OpenTelemetryMiddleware
from opentelemetry.sdk.context.propagation.b3_format import B3Format
from opentelemetry.sdk.trace import Tracer


Expand All @@ -39,14 +39,20 @@ def configure_opentelemetry(flask_app: flask.Flask):

* processors?
* exporters?
* propagators?
"""
# Start by configuring all objects required to ensure
# a complete end to end workflow.
# the preferred implementation of these objects must be set,
# as the opentelemetry-api defines the interface with a no-op
# implementation.
trace.set_preferred_tracer_implementation(lambda _: Tracer())
# Next, we need to configure how the values that are used by
# traces and metrics are propagated (such as what specific headers
# carry this value).

# TBD: can remove once default TraceState propagators are installed.
propagator.set_global_propagator(propagator.Propagator(B3Format()))

# Integrations are the glue that binds the OpenTelemetry API
# and the frameworks and libraries that are used together, automatically
# creating Spans and propagating context as appropriate.
Expand All @@ -61,8 +67,8 @@ def configure_opentelemetry(flask_app: flask.Flask):
def hello():
# emit a trace that measures how long the
# sleep takes
with trace.tracer().start_span("sleep"):
time.sleep(0.001)
with trace.tracer().start_span("example-request"):
requests.get("http://www.example.com")
return "hello"


Expand Down
49 changes: 46 additions & 3 deletions opentelemetry-example-app/tests/test_flask_example.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,57 @@
import unittest
from unittest import mock

import requests
from werkzeug.test import Client
from werkzeug.wrappers import BaseResponse

import opentelemetry_example_app.flask_example as flask_example
from opentelemetry.sdk import trace
from opentelemetry.sdk.context.propagation import b3_format


class TestFlaskExample(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.app = flask_example.app

def setUp(self):
mocked_response = requests.models.Response()
mocked_response.status_code = 200
mocked_response.reason = "Roger that!"
self.send_patcher = mock.patch.object(
requests.Session,
"send",
autospec=True,
spec_set=True,
return_value=mocked_response,
)
self.send = self.send_patcher.start()

def tearDown(self):
self.send_patcher.stop()

def test_full_path(self):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is now a full-fledged propagation test. We could probably build on this once we have stuff like exporters.

@c24t I think this will work for one of the test cases we discussed, although not as comprehensive as bringing up a full server.

with self.app.test_client() as client:
response = client.get("/")
assert response.data.decode() == "hello"
trace_id = trace.generate_trace_id()
# We need to use the Werkzeug test app because
# The headers are injected at the wsgi layer.
# The flask test app will not include these, and
# result in the values not propagated.
client = Client(self.app.wsgi_app, BaseResponse)
# emulate b3 headers
client.get(
"/",
headers={
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could use the B3 formatter's inject directly on the dict here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that might be a little tricky: it would require that I have direct access to the SpanContext for the app, which may not occur in situations where the app lives in a different thread or context than the test code itself.

I feel like this is a more thorough test of the defined behavior, although I definitely see the merit of not effectively redefining the b3 interface.

"x-b3-traceid": b3_format.format_trace_id(trace_id),
"x-b3-spanid": b3_format.format_span_id(
trace.generate_span_id()
),
"x-b3-sampled": "1",
},
)
# assert the http request header was propagated through.
prepared_request = self.send.call_args[0][1]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to be sure: This does not test the headers that were "sent" by client.get but the headers that were sent by the example app using requests?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's not an easy way to differentiate between headers that were directly set by a user vs the headers that were set in the propagator: both are setting the headers keyword that is passed in as part of the request.

Theoretically someone could modify the examples to send the same headers that the propagator is responsible for, but that's the not case today. Also the way that the integration is written, propagator headers will override any user-defined headers.

headers = prepared_request.headers
for required_header in {"x-b3-traceid", "x-b3-spanid", "x-b3-sampled"}:
assert required_header in headers
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are not using pytest yet, so please use unittest assertions, like self.assertIn.

assert headers["x-b3-traceid"] == b3_format.format_trace_id(trace_id)
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ def extract(cls, get_from_carrier, carrier):
# header is set to allow.
if sampled in cls._SAMPLE_PROPAGATE_VALUES or flags == "1":
options |= trace.TraceOptions.RECORDED

return trace.SpanContext(
# trace an span ids are encoded in hex, so must be converted
trace_id=int(trace_id, 16),
Expand Down