Skip to content

Commit

Permalink
trace parts of osbs-client calls
Browse files Browse the repository at this point in the history
expecially tasks with http requests

Signed-off-by: Harsh Modi <hmodi@redhat.com>
  • Loading branch information
hjmodi committed Sep 12, 2023
1 parent c9b1366 commit 00c9fcd
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 68 deletions.
37 changes: 16 additions & 21 deletions osbs/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from osbs.utils.labels import Labels
# import utils in this way, so that we can mock standalone functions with flexmock
from osbs import utils
from osbs.utils.otel import get_current_traceparent, init_otel
from osbs.utils.otel import get_current_traceparent


def _load_pipeline_from_template(pipeline_run_path, substitutions):
Expand Down Expand Up @@ -255,8 +255,6 @@ def _get_binary_container_pipeline_data(self, *, buildtime_limit, user_params,

@osbsapi
def create_binary_container_build(self, **kwargs):
init_otel(otel_url=self.os_conf.get_otel_url(),
traceparent=kwargs.get('traceparent', None))
return self.create_binary_container_pipeline_run(**kwargs)

@osbsapi
Expand Down Expand Up @@ -302,14 +300,13 @@ def create_binary_container_pipeline_run(self,

req_labels = self._check_labels(repo_info)

if 'traceparent' in kwargs:
# we are effectively making current span as parent here
# if the traceparent is not updated then child call will
# be linked to the parent of current call
traceparent = get_current_traceparent()
kwargs.update({
'traceparent': traceparent
})
# we are effectively making current span as parent here
# if the traceparent is not updated then child call will
# be linked to the parent of current call
traceparent = get_current_traceparent()
kwargs.update({
'traceparent': traceparent
})

user_params = self.get_user_params(
base_image=repo_info.base_image,
Expand Down Expand Up @@ -363,8 +360,6 @@ def _get_source_container_pipeline_data(self, *, user_params, pipeline_run_name)

@osbsapi
def create_source_container_build(self, **kwargs):
init_otel(otel_url=self.os_conf.get_otel_url(),
traceparent=kwargs.get('traceparent', None))
return self.create_source_container_pipeline_run(**kwargs)

@osbsapi
Expand All @@ -386,14 +381,14 @@ def create_source_container_pipeline_run(self,
if error_messages:
raise OsbsValidationException(", ".join(error_messages))

if 'traceparent' in kwargs:
# we are effectively making current span as parent here
# if the traceparent is not updated then child call will
# be linked to the parent of current call
traceparent = get_current_traceparent()
kwargs.update({
'traceparent': traceparent
})
# we are effectively making current span as parent here
# if the traceparent is not updated then child call will
# be linked to the parent of current call
traceparent = get_current_traceparent()
kwargs.update({
'traceparent': traceparent
})

user_params = SourceContainerUserParams.make_params(
build_conf=self.os_conf,
component=component,
Expand Down
119 changes: 73 additions & 46 deletions osbs/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@

import sys
import argparse

from opentelemetry.sdk.environment_variables import OTEL_SERVICE_NAME
from otel_extensions import get_tracer

from osbs import set_logging
from osbs.api import OSBS
from osbs.conf import Configuration
Expand All @@ -22,6 +26,7 @@
from osbs.exceptions import (OsbsNetworkException, OsbsException, OsbsAuthException,
OsbsResponseException)
from osbs.utils import UserWarningsStore
from osbs.utils.otel import init_otel

logger = logging.getLogger('osbs')

Expand All @@ -35,23 +40,26 @@ def _print_pipeline_run_logs(pipeline_run, user_warnings_store):
"""
pipeline_run_name = pipeline_run.pipeline_run_name

pipeline_run_logs = pipeline_run.get_logs(follow=True, wait=True)
if not isinstance(pipeline_run_logs, collections.abc.Iterable):
logger.error("'%s' is not iterable; can't display logs", pipeline_run_name)
return False
print(f"Pipeline run created ({pipeline_run_name}), watching logs (feel free to interrupt)")
try:
for _, line in pipeline_run_logs:
if user_warnings_store.is_user_warning(line):
user_warnings_store.store(line)
continue
span_name = 'get_pipeline_logs'
tracer = get_tracer(module_name=span_name, service_name=OTEL_SERVICE_NAME)
with tracer.start_as_current_span(span_name):
pipeline_run_logs = pipeline_run.get_logs(follow=True, wait=True)
if not isinstance(pipeline_run_logs, collections.abc.Iterable):
logger.error("'%s' is not iterable; can't display logs", pipeline_run_name)
return False
print(f"Pipeline run created ({pipeline_run_name}), watching logs (feel free to interrupt)")
try:
for _, line in pipeline_run_logs:
if user_warnings_store.is_user_warning(line):
user_warnings_store.store(line)
continue

print('{!r}'.format(line))
return True
except Exception as ex:
logger.error("Error during fetching logs for pipeline run %s: %s",
pipeline_run_name, repr(ex))
return False
print('{!r}'.format(line))
return True
except Exception as ex:
logger.error("Error during fetching logs for pipeline run %s: %s",
pipeline_run_name, repr(ex))
return False


def _get_build_metadata(pipeline_run, user_warnings_store):
Expand Down Expand Up @@ -139,21 +147,30 @@ def cmd_build(args):
if osbs.os_conf.get_flatpak():
build_kwargs['flatpak'] = True

pipeline_run = osbs.create_binary_container_pipeline_run(**build_kwargs)

print_output(pipeline_run, export_metadata_file=args.export_metadata_file)

return_val = -1

if pipeline_run.has_succeeded():
return_val = 0
cleanup_used_resources = osbs.os_conf.get_cleanup_used_resources()
if cleanup_used_resources and pipeline_run.data is not None:
try:
logger.info("pipeline run removed: %s", pipeline_run.remove_pipeline_run())
except OsbsResponseException:
logger.error("failed to remove pipeline run %s", pipeline_run.pipeline_run_name)
raise
init_otel(otel_url=os_conf.get_otel_url(),
traceparent=args.traceparent)
span_name = 'binary_build'
tracer = get_tracer(module_name=span_name, service_name=OTEL_SERVICE_NAME)
with tracer.start_as_current_span(span_name) as span:
for k, v in build_kwargs.items():
if not v:
continue
span.set_attribute(k, v)
pipeline_run = osbs.create_binary_container_pipeline_run(**build_kwargs)

print_output(pipeline_run, export_metadata_file=args.export_metadata_file)

return_val = -1

if pipeline_run.has_succeeded():
return_val = 0
cleanup_used_resources = osbs.os_conf.get_cleanup_used_resources()
if cleanup_used_resources and pipeline_run.data is not None:
try:
logger.info("pipeline run removed: %s", pipeline_run.remove_pipeline_run())
except OsbsResponseException:
logger.error("failed to remove pipeline run %s", pipeline_run.pipeline_run_name)
raise
return return_val


Expand Down Expand Up @@ -181,21 +198,30 @@ def cmd_build_source_container(args):
if args.traceparent:
build_kwargs['traceparent'] = args.traceparent

pipeline_run = osbs.create_source_container_pipeline_run(**build_kwargs)

print_output(pipeline_run, export_metadata_file=args.export_metadata_file)

return_val = -1

if pipeline_run.has_succeeded():
return_val = 0
cleanup_used_resources = osbs.os_conf.get_cleanup_used_resources()
if cleanup_used_resources and pipeline_run.data is not None:
try:
logger.info("pipeline run removed: %s", pipeline_run.remove_pipeline_run())
except OsbsResponseException:
logger.error("failed to remove pipeline run %s", pipeline_run.pipeline_run_name)
raise
init_otel(otel_url=os_conf.get_otel_url(),
traceparent=args.traceparent)
span_name = 'source_build'
tracer = get_tracer(module_name=span_name, service_name=OTEL_SERVICE_NAME)
with tracer.start_as_current_span(span_name) as span:
for k, v in build_kwargs.items():
if not v:
continue
span.set_attribute(k, v)
pipeline_run = osbs.create_source_container_pipeline_run(**build_kwargs)

print_output(pipeline_run, export_metadata_file=args.export_metadata_file)

return_val = -1

if pipeline_run.has_succeeded():
return_val = 0
cleanup_used_resources = osbs.os_conf.get_cleanup_used_resources()
if cleanup_used_resources and pipeline_run.data is not None:
try:
logger.info("pipeline run removed: %s", pipeline_run.remove_pipeline_run())
except OsbsResponseException:
logger.error("failed to remove pipeline run %s", pipeline_run.pipeline_run_name)
raise
return return_val


Expand Down Expand Up @@ -426,6 +452,7 @@ def main():
set_logging(level=logging.INFO)

return_value = -1

try:
return_value = args.func(args)
except AttributeError:
Expand Down
2 changes: 2 additions & 0 deletions osbs/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@
# number of seconds to wait, before retrying on openshift not found
OS_NOT_FOUND_MAX_WAIT = 1

OTEL_SERVICE_NAME = "osbs"

ISOLATED_RELEASE_FORMAT = re.compile(r'^\d+\.\d+(\..+)?$')
RELEASE_LABEL_FORMAT = re.compile(r"""^\d+ # First character must be a digit
([._]? # allow separators between groups
Expand Down
8 changes: 8 additions & 0 deletions osbs/tekton.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from typing import Dict, List, Tuple, Callable, Any
from datetime import datetime

from otel_extensions import instrumented

from osbs.exceptions import OsbsResponseException, OsbsAuthException, OsbsException
from osbs.constants import (DEFAULT_NAMESPACE, SERVICEACCOUNT_SECRET, SERVICEACCOUNT_TOKEN,
Expand Down Expand Up @@ -422,6 +423,7 @@ def pipeline_run_url(self):
)
return self._pipeline_run_url

@instrumented
def start_pipeline_run(self):
if not self.input_data:
raise OsbsException("No input data provided for pipeline run to start")
Expand All @@ -445,6 +447,7 @@ def start_pipeline_run(self):
)
return response.json()

@instrumented
def remove_pipeline_run(self):
url = self.os.build_url(
self.api_path,
Expand All @@ -458,6 +461,7 @@ def remove_pipeline_run(self):
return response.json()

@retry_on_conflict
@instrumented
def cancel_pipeline_run(self):
data = copy.deepcopy(self.minimal_data)
data['spec']['status'] = 'CancelledRunFinally'
Expand All @@ -479,6 +483,7 @@ def cancel_pipeline_run(self):
raise OsbsException(exc_msg)
return response_json

@instrumented
def get_info(self, wait=False):
if wait:
self.wait_for_start()
Expand Down Expand Up @@ -731,6 +736,7 @@ def load_result(result: Dict[str, str]) -> Tuple[str, Any]:
name: value for name, value in map(load_result, pipeline_results) if value is not None
}

@instrumented
def wait_for_start(self):
"""
https://tekton.dev/docs/pipelines/pipelineruns/#monitoring-execution-status
Expand Down Expand Up @@ -883,6 +889,7 @@ def get_logs(self, follow=False, wait=False):
pod = Pod(os=self.os, pod_name=pod_name, containers=containers)
return pod.get_logs(follow=follow, wait=wait)

@instrumented
def wait_for_start(self):
"""
https://tekton.dev/docs/pipelines/taskruns/#monitoring-execution-status
Expand Down Expand Up @@ -1033,6 +1040,7 @@ def _stream_logs(self, container):
logger.debug("Fetching logs starting from %ds ago", since)
kwargs['sinceSeconds'] = since

@instrumented
def wait_for_start(self):
logger.info("Waiting for pod to start '%s'", self.pod_name)
for pod in self.os.watch_resource(
Expand Down
7 changes: 6 additions & 1 deletion osbs/utils/otel.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@


def init_otel(otel_url: Optional[str], traceparent: Optional[str]):
logger.info("Initializing otel with traceparent %s", traceparent)
span_exporter = ''
otel_protocol = 'http/protobuf'
if not otel_url:
Expand All @@ -35,11 +36,15 @@ def init_otel(otel_url: Optional[str], traceparent: Optional[str]):
OTEL_EXPORTER_OTLP_PROTOCOL=otel_protocol,
)
init_telemetry_provider(otel_options)
if 'TRACEPARENT' in os.environ:
del os.environ['TRACEPARENT']
RequestsInstrumentor().instrument()
logger.info("Initialization complete")


def get_current_traceparent(**kwargs):
def get_current_traceparent():
tracecontext = trace.get_current_span().get_span_context()
traceparent = (f'00-{format_trace_id(tracecontext.trace_id)}-'
f'{format_span_id(tracecontext.span_id)}-01')
logger.info("current traceparent is %s", traceparent)
return traceparent

0 comments on commit 00c9fcd

Please sign in to comment.