Skip to content

Commit

Permalink
opentelemetry callback: context propagation and error exception (#3378)
Browse files Browse the repository at this point in the history
* opentelemetry callback: context propagation and error exception

* Apply suggestions from code review

Co-authored-by: Felix Fontein <felix@fontein.de>

Co-authored-by: Felix Fontein <felix@fontein.de>
  • Loading branch information
v1v and felixfontein authored Sep 16, 2021
1 parent b20fc7a commit 0634583
Showing 1 changed file with 26 additions and 4 deletions.
30 changes: 26 additions & 4 deletions plugins/callback/opentelemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@
- The service name resource attribute.
env:
- name: OTEL_SERVICE_NAME
traceparent:
default: None
type: str
description:
- The L(W3C Trace Context header traceparent,https://www.w3.org/TR/trace-context-1/#traceparent-header).
env:
- name: TRACEPARENT
requirements:
- opentelemetry-api (python lib)
- opentelemetry-exporter-otlp (python lib)
Expand Down Expand Up @@ -64,9 +71,11 @@

try:
from opentelemetry import trace
from opentelemetry.trace import SpanKind
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.trace.status import Status, StatusCode
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
ConsoleSpanExporter,
Expand Down Expand Up @@ -151,6 +160,11 @@ def __init__(self, display):

self._display = display

def traceparent_context(self, traceparent):
carrier = dict()
carrier['traceparent'] = traceparent
return TraceContextTextMapPropagator().extract(carrier=carrier)

def start_task(self, tasks_data, hide_task_arguments, play_name, task):
""" record the start of a task for one or more hosts """

Expand Down Expand Up @@ -188,7 +202,7 @@ def finish_task(self, tasks_data, status, result):

task.add_host(HostData(host_uuid, host_name, status, result))

def generate_distributed_traces(self, otel_service_name, ansible_playbook, tasks_data, status):
def generate_distributed_traces(self, otel_service_name, ansible_playbook, tasks_data, status, traceparent):
""" generate distributed traces from the collected TaskData and HostData """

tasks = []
Expand All @@ -210,7 +224,8 @@ def generate_distributed_traces(self, otel_service_name, ansible_playbook, tasks

tracer = trace.get_tracer(__name__)

with tracer.start_as_current_span(ansible_playbook, start_time=parent_start_time) as parent:
with tracer.start_as_current_span(ansible_playbook, context=self.traceparent_context(traceparent),
start_time=parent_start_time, kind=SpanKind.SERVER) as parent:
parent.set_status(status)
# Populate trace metadata attributes
if self.ansible_version is not None:
Expand Down Expand Up @@ -244,7 +259,9 @@ def update_span_data(self, task_data, host_data, span):
message = res['msg']
else:
message = 'failed'
status = Status(status_code=StatusCode.ERROR)
status = Status(status_code=StatusCode.ERROR, description=message)
# Record an exception with the task message
span.record_exception(BaseException(message))
elif host_data.status == 'skipped':
if 'skip_reason' in res:
message = res['skip_reason']
Expand Down Expand Up @@ -291,6 +308,7 @@ def __init__(self, display=None):
self.tasks_data = None
self.errors = 0
self.disabled = False
self.traceparent = False

if OTEL_LIBRARY_IMPORT_ERROR:
raise_from(
Expand Down Expand Up @@ -318,6 +336,9 @@ def set_options(self, task_keys=None, var_options=None, direct=None):
if not self.otel_service_name:
self.otel_service_name = 'ansible'

# See https://github.com/open-telemetry/opentelemetry-specification/issues/740
self.traceparent = self.get_option('traceparent')

def v2_playbook_on_start(self, playbook):
self.ansible_playbook = basename(playbook._file_name)

Expand Down Expand Up @@ -394,7 +415,8 @@ def v2_playbook_on_stats(self, stats):
self.otel_service_name,
self.ansible_playbook,
self.tasks_data,
status
status,
self.traceparent
)

def v2_runner_on_async_failed(self, result, **kwargs):
Expand Down

0 comments on commit 0634583

Please sign in to comment.