Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce OpenTelemetry Tracing decorators #1257

Merged
merged 6 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 21 additions & 9 deletions google/cloud/storage/_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from google.cloud import _http
from google.cloud.storage import __version__
from google.cloud.storage import _helpers
from google.cloud.storage._opentelemetry_tracing import create_span


class Connection(_http.JSONConnection):
Expand Down Expand Up @@ -65,14 +66,25 @@ def __init__(self, client, client_info=None, api_endpoint=None):

def api_request(self, *args, **kwargs):
retry = kwargs.pop("retry", None)
kwargs["extra_api_info"] = _helpers._get_invocation_id()
invocation_id = _helpers._get_invocation_id()
kwargs["extra_api_info"] = invocation_id
span_attributes = {
"gccl-invocation-id": invocation_id,
}
call = functools.partial(super(Connection, self).api_request, *args, **kwargs)
if retry:
# If this is a ConditionalRetryPolicy, check conditions.
try:
retry = retry.get_retry_policy_if_conditions_met(**kwargs)
except AttributeError: # This is not a ConditionalRetryPolicy.
pass
with create_span(
name="Storage.Connection.api_request",
andrewsg marked this conversation as resolved.
Show resolved Hide resolved
attributes=span_attributes,
client=self._client,
api_request=kwargs,
retry=retry,
):
if retry:
call = retry(call)
return call()
# If this is a ConditionalRetryPolicy, check conditions.
try:
retry = retry.get_retry_policy_if_conditions_met(**kwargs)
except AttributeError: # This is not a ConditionalRetryPolicy.
pass
if retry:
call = retry(call)
return call()
100 changes: 100 additions & 0 deletions google/cloud/storage/_opentelemetry_tracing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
"""Manages OpenTelemetry tracing span creation and handling."""

import logging
import os

from contextlib import contextmanager

from google.api_core import exceptions as api_exceptions
from google.api_core import retry as api_retry
from google.cloud.storage import __version__
from google.cloud.storage.retry import ConditionalRetryPolicy


ENABLE_OTEL_TRACES_ENV_VAR = "ENABLE_GCS_PYTHON_CLIENT_OTEL_TRACES"
_DEFAULT_ENABLE_OTEL_TRACES_VALUE = False

enable_otel_traces = os.environ.get(
ENABLE_OTEL_TRACES_ENV_VAR, _DEFAULT_ENABLE_OTEL_TRACES_VALUE
)
logger = logging.getLogger(__name__)

try:
from opentelemetry import trace

HAS_OPENTELEMETRY = True

except ImportError:
logger.debug(
"This service is instrumented using OpenTelemetry. "
"OpenTelemetry or one of its components could not be imported; "
"please add compatible versions of opentelemetry-api and "
"opentelemetry-instrumentation packages in order to get Storage "
"Tracing data."
)
HAS_OPENTELEMETRY = False

_default_attributes = {
"rpc.service": "CloudStorage",
"rpc.system": "http",
"user_agent.original": f"gcloud-python/{__version__}",
}


@contextmanager
def create_span(
name, attributes=None, client=None, api_request=None, retry=None, **kwargs
):
"""Creates a context manager for a new span and set it as the current span
in the configured tracer. If no configuration exists yields None."""
if not HAS_OPENTELEMETRY or not enable_otel_traces:
yield None
andrewsg marked this conversation as resolved.
Show resolved Hide resolved
return

tracer = trace.get_tracer(__name__)
final_attributes = _get_final_attributes(attributes, client, api_request, retry)
# Yield new span.
with tracer.start_as_current_span(
name=name, kind=trace.SpanKind.CLIENT, attributes=final_attributes
) as span:
try:
yield span
except api_exceptions.GoogleAPICallError as error:
span.set_status(trace.Status(trace.StatusCode.ERROR))
span.record_exception(error)
raise


def _get_final_attributes(attributes=None, client=None, api_request=None, retry=None):
collected_attr = _default_attributes.copy()
if api_request:
collected_attr.update(_set_api_request_attr(api_request, client))
if isinstance(retry, api_retry.Retry):
collected_attr.update(_set_retry_attr(retry))
if isinstance(retry, ConditionalRetryPolicy):
collected_attr.update(
_set_retry_attr(retry.retry_policy, retry.conditional_predicate)
)
if attributes:
collected_attr.update(attributes)
final_attributes = {k: v for k, v in collected_attr.items() if v is not None}
return final_attributes


def _set_api_request_attr(request, client):
attr = {}
if request.get("method"):
attr["http.request.method"] = request.get("method")
if request.get("path"):
path = request.get("path")
full_path = f"{client._connection.API_BASE_URL}{path}"
attr["url.full"] = full_path
if request.get("timeout"):
attr["connect_timeout,read_timeout"] = request.get("timeout")
return attr


def _set_retry_attr(retry, conditional_predicate=None):
predicate = conditional_predicate if conditional_predicate else retry._predicate
retry_info = f"multiplier{retry._multiplier}/deadline{retry._deadline}/max{retry._maximum}/initial{retry._initial}/predicate{predicate}"
return {"retry": retry_info}
2 changes: 2 additions & 0 deletions google/cloud/storage/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from google.cloud.storage._helpers import _STORAGE_HOST_TEMPLATE
from google.cloud.storage._helpers import _NOW
from google.cloud.storage._helpers import _UTC
from google.cloud.storage._opentelemetry_tracing import create_span

from google.cloud.storage._http import Connection
from google.cloud.storage._signing import (
Expand Down Expand Up @@ -337,6 +338,7 @@ def current_batch(self):
"""
return self._batch_stack.top

@create_span(name="Storage.Client.getServiceAccountEmail")
def get_service_account_email(
self, project=None, timeout=_DEFAULT_TIMEOUT, retry=DEFAULT_RETRY
):
Expand Down
6 changes: 5 additions & 1 deletion noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,16 @@ def lint_setup_py(session):
session.run("python", "setup.py", "check", "--restructuredtext", "--strict")


def default(session):
def default(session, install_extras=True):
constraints_path = str(
CURRENT_DIRECTORY / "testing" / f"constraints-{session.python}.txt"
)
# Install all test dependencies, then install this package in-place.
session.install("mock", "pytest", "pytest-cov", "-c", constraints_path)

if install_extras:
session.install("opentelemetry-api", "opentelemetry-sdk")

session.install("-e", ".", "-c", constraints_path)

# Run py.test against the unit tests.
Expand Down
7 changes: 6 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,12 @@
"requests >= 2.18.0, < 3.0.0dev",
"google-crc32c >= 1.0, < 2.0dev",
]
extras = {"protobuf": ["protobuf<5.0.0dev"]}
extras = {
"protobuf": ["protobuf<5.0.0dev"],
"tracing": [
"opentelemetry-api >= 1.1.0",
],
}


# Setup boilerplate below this line.
Expand Down
Loading