diff --git a/osbs/api.py b/osbs/api.py index 186bf534..434f8813 100644 --- a/osbs/api.py +++ b/osbs/api.py @@ -16,17 +16,20 @@ from typing import Any, Dict from string import Template +from otel_extensions import get_tracer + from osbs.build.user_params import ( BuildUserParams, SourceContainerUserParams ) from osbs.constants import (RELEASE_LABEL_FORMAT, VERSION_LABEL_FORBIDDEN_CHARS, - ISOLATED_RELEASE_FORMAT) + ISOLATED_RELEASE_FORMAT, OTEL_SERVICE_NAME) from osbs.tekton import Openshift, PipelineRun from osbs.exceptions import (OsbsException, OsbsValidationException, OsbsResponseException) from osbs.utils.labels import Labels # import utils in this way, so that we can mock standalone functions with flexmock from osbs import utils +from osbs.utils.otel import get_current_traceparent, init_otel def _load_pipeline_from_template(pipeline_run_path, substitutions): @@ -81,6 +84,7 @@ class OSBS(object): def __init__(self, openshift_configuration): """ """ self.os_conf = openshift_configuration + init_otel(self.os_conf.get_otel_url(), traceparent=get_current_traceparent()) self.os = Openshift(openshift_api_url=self.os_conf.get_openshift_base_uri(), openshift_oauth_url=self.os_conf.get_openshift_oauth_api_uri(), k8s_api_url=self.os_conf.get_k8s_api_uri(), @@ -254,7 +258,15 @@ def _get_binary_container_pipeline_data(self, *, buildtime_limit, user_params, @osbsapi def create_binary_container_build(self, **kwargs): - return self.create_binary_container_pipeline_run(**kwargs) + span_name = 'binary_build' + tracer = get_tracer(module_name=span_name, service_name=OTEL_SERVICE_NAME) + with tracer.start_as_current_span(span_name) as span: + for k, v in kwargs.items(): + if not v: + continue + span.set_attribute(k, v) + result = self.create_binary_container_pipeline_run(**kwargs) + return result @osbsapi def create_binary_container_pipeline_run(self, @@ -350,7 +362,15 @@ def _get_source_container_pipeline_data(self, *, user_params, pipeline_run_name) @osbsapi def create_source_container_build(self, **kwargs): - return self.create_source_container_pipeline_run(**kwargs) + span_name = 'source_build' + tracer = get_tracer(module_name=span_name, service_name=OTEL_SERVICE_NAME) + with tracer.start_as_current_span(span_name) as span: + for k, v in kwargs.items(): + if not v: + continue + span.set_attribute(k, v) + result = self.create_source_container_pipeline_run(**kwargs) + return result @osbsapi def create_source_container_pipeline_run(self, diff --git a/osbs/cli/main.py b/osbs/cli/main.py index 03cd7b0c..5c7a9855 100644 --- a/osbs/cli/main.py +++ b/osbs/cli/main.py @@ -137,7 +137,7 @@ def cmd_build(args): if osbs.os_conf.get_flatpak(): build_kwargs['flatpak'] = True - pipeline_run = osbs.create_binary_container_pipeline_run(**build_kwargs) + pipeline_run = osbs.create_binary_container_build(**build_kwargs) print_output(pipeline_run, export_metadata_file=args.export_metadata_file) @@ -177,7 +177,7 @@ def cmd_build_source_container(args): if args.userdata: build_kwargs['userdata'] = json.loads(args.userdata) - pipeline_run = osbs.create_source_container_pipeline_run(**build_kwargs) + pipeline_run = osbs.create_source_container_build(**build_kwargs) print_output(pipeline_run, export_metadata_file=args.export_metadata_file) diff --git a/osbs/constants.py b/osbs/constants.py index 18e003f4..caec35d4 100644 --- a/osbs/constants.py +++ b/osbs/constants.py @@ -74,6 +74,8 @@ # number of seconds to wait, before retrying on openshift not found OS_NOT_FOUND_MAX_WAIT = 1 +OTEL_SERVICE_NAME = "osbs" + ISOLATED_RELEASE_FORMAT = re.compile(r'^\d+\.\d+(\..+)?$') RELEASE_LABEL_FORMAT = re.compile(r"""^\d+ # First character must be a digit ([._]? # allow separators between groups diff --git a/osbs/tekton.py b/osbs/tekton.py index 56a85776..ae46ab64 100644 --- a/osbs/tekton.py +++ b/osbs/tekton.py @@ -14,6 +14,7 @@ from typing import Dict, List, Tuple, Callable, Any from datetime import datetime +from otel_extensions import instrumented from osbs.exceptions import OsbsResponseException, OsbsAuthException, OsbsException from osbs.constants import (DEFAULT_NAMESPACE, SERVICEACCOUNT_SECRET, SERVICEACCOUNT_TOKEN, @@ -422,6 +423,7 @@ def pipeline_run_url(self): ) return self._pipeline_run_url + @instrumented def start_pipeline_run(self): if not self.input_data: raise OsbsException("No input data provided for pipeline run to start") @@ -445,6 +447,7 @@ def start_pipeline_run(self): ) return response.json() + @instrumented def remove_pipeline_run(self): url = self.os.build_url( self.api_path, @@ -458,6 +461,7 @@ def remove_pipeline_run(self): return response.json() @retry_on_conflict + @instrumented def cancel_pipeline_run(self): data = copy.deepcopy(self.minimal_data) data['spec']['status'] = 'CancelledRunFinally' @@ -479,6 +483,7 @@ def cancel_pipeline_run(self): raise OsbsException(exc_msg) return response_json + @instrumented def get_info(self, wait=False): if wait: self.wait_for_start() @@ -672,6 +677,7 @@ def matches_state(task_run: Dict[str, Any]) -> bool: task_runs = self.data['status'].get('taskRuns', {}).values() return any(matches_state(tr) for tr in task_runs) + @instrumented def wait_for_finish(self): """ use this method after reading logs finished, to ensure that pipeline run finished, @@ -731,6 +737,7 @@ def load_result(result: Dict[str, str]) -> Tuple[str, Any]: name: value for name, value in map(load_result, pipeline_results) if value is not None } + @instrumented def wait_for_start(self): """ https://tekton.dev/docs/pipelines/pipelineruns/#monitoring-execution-status @@ -764,6 +771,7 @@ def wait_for_start(self): logger.debug("Waiting for pipeline run, current status %s, reason %s", status, reason) + @instrumented def wait_for_taskruns(self): """ This generator method watches new task runs in a pipeline run @@ -857,6 +865,7 @@ def __init__(self, os, task_run_name): self.api_path = 'apis' self.api_version = API_VERSION + @instrumented def get_info(self, wait=False): if wait: self.wait_for_start() @@ -883,6 +892,7 @@ def get_logs(self, follow=False, wait=False): pod = Pod(os=self.os, pod_name=pod_name, containers=containers) return pod.get_logs(follow=follow, wait=wait) + @instrumented def wait_for_start(self): """ https://tekton.dev/docs/pipelines/taskruns/#monitoring-execution-status @@ -922,6 +932,7 @@ def __init__(self, os, pod_name, containers=None): self.api_version = 'v1' self.api_path = 'api' + @instrumented def get_info(self, wait=False): if wait: self.wait_for_start() @@ -1033,6 +1044,7 @@ def _stream_logs(self, container): logger.debug("Fetching logs starting from %ds ago", since) kwargs['sinceSeconds'] = since + @instrumented def wait_for_start(self): logger.info("Waiting for pod to start '%s'", self.pod_name) for pod in self.os.watch_resource( diff --git a/osbs/utils/otel.py b/osbs/utils/otel.py index ecaf82e4..ec29654d 100644 --- a/osbs/utils/otel.py +++ b/osbs/utils/otel.py @@ -5,12 +5,49 @@ This software may be modified and distributed under the terms of the BSD license. See the LICENSE file for details. """ +import logging +import os +from typing import Optional + from opentelemetry import trace +from opentelemetry.instrumentation.requests import RequestsInstrumentor from opentelemetry.trace import format_trace_id, format_span_id +from otel_extensions import TelemetryOptions, init_telemetry_provider + +from osbs.constants import OTEL_SERVICE_NAME + +logger = logging.getLogger(__name__) + + +def init_otel(otel_url: Optional[str], traceparent: Optional[str]): + logger.info("Initializing otel with traceparent %s", traceparent) + span_exporter = '' + otel_protocol = 'http/protobuf' + if not otel_url: + otel_protocol = 'custom' + span_exporter = '"opentelemetry.sdk.trace.export.ConsoleSpanExporter"' + + if traceparent: + os.environ['TRACEPARENT'] = traceparent + otel_options = TelemetryOptions( + OTEL_SERVICE_NAME=OTEL_SERVICE_NAME, + OTEL_EXPORTER_CUSTOM_SPAN_EXPORTER_TYPE=span_exporter, + OTEL_EXPORTER_OTLP_ENDPOINT=otel_url, + OTEL_EXPORTER_OTLP_PROTOCOL=otel_protocol, + ) + init_telemetry_provider(otel_options) + if 'TRACEPARENT' in os.environ: + del os.environ['TRACEPARENT'] + RequestsInstrumentor().instrument() + logger.info("Initialization complete") def get_current_traceparent(): tracecontext = trace.get_current_span().get_span_context() traceparent = (f'00-{format_trace_id(tracecontext.trace_id)}-' f'{format_span_id(tracecontext.span_id)}-01') + logger.info("current traceparent is %s", traceparent) + none_traceparent = '00-00000000000000000000000000000000-0000000000000000-01' + if traceparent == none_traceparent: + return None return traceparent