Skip to content

Commit

Permalink
opentelemetry: trace parts of osbs-client calls
Browse files Browse the repository at this point in the history
expecially tasks with http requests.

This will allow us to get traces for all important
HTTP calls. We are also collecting all args provided
by the user for the build

* MMENG-3834

Signed-off-by: Harsh Modi <hmodi@redhat.com>
  • Loading branch information
hjmodi committed Sep 13, 2023
1 parent f2bf60f commit ff62daf
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 5 deletions.
26 changes: 23 additions & 3 deletions osbs/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,20 @@
from typing import Any, Dict
from string import Template

from otel_extensions import get_tracer

from osbs.build.user_params import (
BuildUserParams,
SourceContainerUserParams
)
from osbs.constants import (RELEASE_LABEL_FORMAT, VERSION_LABEL_FORBIDDEN_CHARS,
ISOLATED_RELEASE_FORMAT)
ISOLATED_RELEASE_FORMAT, OTEL_SERVICE_NAME)
from osbs.tekton import Openshift, PipelineRun
from osbs.exceptions import (OsbsException, OsbsValidationException, OsbsResponseException)
from osbs.utils.labels import Labels
# import utils in this way, so that we can mock standalone functions with flexmock
from osbs import utils
from osbs.utils.otel import get_current_traceparent, init_otel


def _load_pipeline_from_template(pipeline_run_path, substitutions):
Expand Down Expand Up @@ -81,6 +84,7 @@ class OSBS(object):
def __init__(self, openshift_configuration):
""" """
self.os_conf = openshift_configuration
init_otel(self.os_conf.get_otel_url(), traceparent=get_current_traceparent())
self.os = Openshift(openshift_api_url=self.os_conf.get_openshift_base_uri(),
openshift_oauth_url=self.os_conf.get_openshift_oauth_api_uri(),
k8s_api_url=self.os_conf.get_k8s_api_uri(),
Expand Down Expand Up @@ -254,7 +258,15 @@ def _get_binary_container_pipeline_data(self, *, buildtime_limit, user_params,

@osbsapi
def create_binary_container_build(self, **kwargs):
return self.create_binary_container_pipeline_run(**kwargs)
span_name = 'binary_build'
tracer = get_tracer(module_name=span_name, service_name=OTEL_SERVICE_NAME)
with tracer.start_as_current_span(span_name) as span:
for k, v in kwargs.items():
if not v:
continue
span.set_attribute(k, v)
result = self.create_binary_container_pipeline_run(**kwargs)
return result

@osbsapi
def create_binary_container_pipeline_run(self,
Expand Down Expand Up @@ -350,7 +362,15 @@ def _get_source_container_pipeline_data(self, *, user_params, pipeline_run_name)

@osbsapi
def create_source_container_build(self, **kwargs):
return self.create_source_container_pipeline_run(**kwargs)
span_name = 'source_build'
tracer = get_tracer(module_name=span_name, service_name=OTEL_SERVICE_NAME)
with tracer.start_as_current_span(span_name) as span:
for k, v in kwargs.items():
if not v:
continue
span.set_attribute(k, v)
result = self.create_source_container_pipeline_run(**kwargs)
return result

@osbsapi
def create_source_container_pipeline_run(self,
Expand Down
4 changes: 2 additions & 2 deletions osbs/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ def cmd_build(args):
if osbs.os_conf.get_flatpak():
build_kwargs['flatpak'] = True

pipeline_run = osbs.create_binary_container_pipeline_run(**build_kwargs)
pipeline_run = osbs.create_binary_container_build(**build_kwargs)

print_output(pipeline_run, export_metadata_file=args.export_metadata_file)

Expand Down Expand Up @@ -177,7 +177,7 @@ def cmd_build_source_container(args):
if args.userdata:
build_kwargs['userdata'] = json.loads(args.userdata)

pipeline_run = osbs.create_source_container_pipeline_run(**build_kwargs)
pipeline_run = osbs.create_source_container_build(**build_kwargs)

print_output(pipeline_run, export_metadata_file=args.export_metadata_file)

Expand Down
2 changes: 2 additions & 0 deletions osbs/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@
# number of seconds to wait, before retrying on openshift not found
OS_NOT_FOUND_MAX_WAIT = 1

OTEL_SERVICE_NAME = "osbs"

ISOLATED_RELEASE_FORMAT = re.compile(r'^\d+\.\d+(\..+)?$')
RELEASE_LABEL_FORMAT = re.compile(r"""^\d+ # First character must be a digit
([._]? # allow separators between groups
Expand Down
12 changes: 12 additions & 0 deletions osbs/tekton.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from typing import Dict, List, Tuple, Callable, Any
from datetime import datetime

from otel_extensions import instrumented

from osbs.exceptions import OsbsResponseException, OsbsAuthException, OsbsException
from osbs.constants import (DEFAULT_NAMESPACE, SERVICEACCOUNT_SECRET, SERVICEACCOUNT_TOKEN,
Expand Down Expand Up @@ -422,6 +423,7 @@ def pipeline_run_url(self):
)
return self._pipeline_run_url

@instrumented
def start_pipeline_run(self):
if not self.input_data:
raise OsbsException("No input data provided for pipeline run to start")
Expand All @@ -445,6 +447,7 @@ def start_pipeline_run(self):
)
return response.json()

@instrumented
def remove_pipeline_run(self):
url = self.os.build_url(
self.api_path,
Expand All @@ -458,6 +461,7 @@ def remove_pipeline_run(self):
return response.json()

@retry_on_conflict
@instrumented
def cancel_pipeline_run(self):
data = copy.deepcopy(self.minimal_data)
data['spec']['status'] = 'CancelledRunFinally'
Expand All @@ -479,6 +483,7 @@ def cancel_pipeline_run(self):
raise OsbsException(exc_msg)
return response_json

@instrumented
def get_info(self, wait=False):
if wait:
self.wait_for_start()
Expand Down Expand Up @@ -672,6 +677,7 @@ def matches_state(task_run: Dict[str, Any]) -> bool:
task_runs = self.data['status'].get('taskRuns', {}).values()
return any(matches_state(tr) for tr in task_runs)

@instrumented
def wait_for_finish(self):
"""
use this method after reading logs finished, to ensure that pipeline run finished,
Expand Down Expand Up @@ -731,6 +737,7 @@ def load_result(result: Dict[str, str]) -> Tuple[str, Any]:
name: value for name, value in map(load_result, pipeline_results) if value is not None
}

@instrumented
def wait_for_start(self):
"""
https://tekton.dev/docs/pipelines/pipelineruns/#monitoring-execution-status
Expand Down Expand Up @@ -764,6 +771,7 @@ def wait_for_start(self):
logger.debug("Waiting for pipeline run, current status %s, reason %s",
status, reason)

@instrumented
def wait_for_taskruns(self):
"""
This generator method watches new task runs in a pipeline run
Expand Down Expand Up @@ -857,6 +865,7 @@ def __init__(self, os, task_run_name):
self.api_path = 'apis'
self.api_version = API_VERSION

@instrumented
def get_info(self, wait=False):
if wait:
self.wait_for_start()
Expand All @@ -883,6 +892,7 @@ def get_logs(self, follow=False, wait=False):
pod = Pod(os=self.os, pod_name=pod_name, containers=containers)
return pod.get_logs(follow=follow, wait=wait)

@instrumented
def wait_for_start(self):
"""
https://tekton.dev/docs/pipelines/taskruns/#monitoring-execution-status
Expand Down Expand Up @@ -922,6 +932,7 @@ def __init__(self, os, pod_name, containers=None):
self.api_version = 'v1'
self.api_path = 'api'

@instrumented
def get_info(self, wait=False):
if wait:
self.wait_for_start()
Expand Down Expand Up @@ -1033,6 +1044,7 @@ def _stream_logs(self, container):
logger.debug("Fetching logs starting from %ds ago", since)
kwargs['sinceSeconds'] = since

@instrumented
def wait_for_start(self):
logger.info("Waiting for pod to start '%s'", self.pod_name)
for pod in self.os.watch_resource(
Expand Down
37 changes: 37 additions & 0 deletions osbs/utils/otel.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,49 @@
This software may be modified and distributed under the terms
of the BSD license. See the LICENSE file for details.
"""
import logging
import os
from typing import Optional

from opentelemetry import trace
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.trace import format_trace_id, format_span_id
from otel_extensions import TelemetryOptions, init_telemetry_provider

from osbs.constants import OTEL_SERVICE_NAME

logger = logging.getLogger(__name__)


def init_otel(otel_url: Optional[str], traceparent: Optional[str]):
logger.info("Initializing otel with traceparent %s", traceparent)
span_exporter = ''
otel_protocol = 'http/protobuf'
if not otel_url:
otel_protocol = 'custom'
span_exporter = '"opentelemetry.sdk.trace.export.ConsoleSpanExporter"'

if traceparent:
os.environ['TRACEPARENT'] = traceparent
otel_options = TelemetryOptions(
OTEL_SERVICE_NAME=OTEL_SERVICE_NAME,
OTEL_EXPORTER_CUSTOM_SPAN_EXPORTER_TYPE=span_exporter,
OTEL_EXPORTER_OTLP_ENDPOINT=otel_url,
OTEL_EXPORTER_OTLP_PROTOCOL=otel_protocol,
)
init_telemetry_provider(otel_options)
if 'TRACEPARENT' in os.environ:
del os.environ['TRACEPARENT']
RequestsInstrumentor().instrument()
logger.info("Initialization complete")


def get_current_traceparent():
tracecontext = trace.get_current_span().get_span_context()
traceparent = (f'00-{format_trace_id(tracecontext.trace_id)}-'
f'{format_span_id(tracecontext.span_id)}-01')
logger.info("current traceparent is %s", traceparent)
none_traceparent = '00-00000000000000000000000000000000-0000000000000000-01'
if traceparent == none_traceparent:
return None
return traceparent

0 comments on commit ff62daf

Please sign in to comment.