From c2ab0e035b179a919b27c7f50318472f14656e00 Mon Sep 17 00:00:00 2001 From: cojenco Date: Wed, 26 Jun 2024 15:03:27 -0700 Subject: [PATCH] feat: add OpenTelemetry Tracing support as a preview feature (#1288) * feat: introduce OpenTelemetry Tracing decorators (#1257) * feat: introduce OpenTelemetry Tracing decorators * update test coverage * add tests, update fixture * update noxfile, extras; remove print * update unit test * review comments * feat: instrument metadata ops with OTel tracing (#2) (#1267) * feat: instrument metadata ops with Otel tracing * update README plus test * update decorator name per review session * fix typo in readme * feat: OTel tracing media ops initial instrumentation (#1280) * feat: OTel tracing media ops initial instrumentation * use download class name as span name * avoid asserting filtered warnings from otel per https://github.com/open-telemetry/opentelemetry-python/pull/3164 * add copyright and preview note * comments --- README.rst | 55 +++++ google/cloud/storage/_http.py | 30 ++- .../cloud/storage/_opentelemetry_tracing.py | 112 +++++++++ google/cloud/storage/acl.py | 5 + google/cloud/storage/blob.py | 101 ++++++-- google/cloud/storage/bucket.py | 22 ++ google/cloud/storage/client.py | 28 ++- google/cloud/storage/hmac_key.py | 5 + google/cloud/storage/notification.py | 5 + noxfile.py | 6 +- setup.py | 7 +- tests/unit/test__opentelemetry_tracing.py | 223 ++++++++++++++++++ tests/unit/test_blob.py | 18 +- 13 files changed, 577 insertions(+), 40 deletions(-) create mode 100644 google/cloud/storage/_opentelemetry_tracing.py create mode 100644 tests/unit/test__opentelemetry_tracing.py diff --git a/README.rst b/README.rst index 9eef57645..32d66a1db 100644 --- a/README.rst +++ b/README.rst @@ -115,6 +115,61 @@ Windows .\\Scripts\activate pip install google-cloud-storage + +Tracing With OpenTelemetry +~~~~~~~~~~~~~~~~~~~~~~~~~~ + +This is a PREVIEW FEATURE: Coverage and functionality are still in development and subject to change. + +This library can be configured to use `OpenTelemetry`_ to generate traces on calls to Google Cloud Storage. +For information on the benefits and utility of tracing, read the `Cloud Trace Overview `_. + +To enable OpenTelemetry tracing in the Cloud Storage client, first install OpenTelemetry: + +.. code-block:: console + + pip install google-cloud-storage[tracing] + +Set the ``ENABLE_GCS_PYTHON_CLIENT_OTEL_TRACES`` environment variable to selectively opt-in tracing for the Cloud Storage client: + +.. code-block:: console + + export ENABLE_GCS_PYTHON_CLIENT_OTEL_TRACES=True + +You will also need to tell OpenTelemetry which exporter to use. An example to export traces to Google Cloud Trace can be found below. + +.. code-block:: console + + # Install the Google Cloud Trace exporter and propagator, however you can use any exporter of your choice. + pip install opentelemetry-exporter-gcp-trace opentelemetry-propagator-gcp + + # [Optional] Install the OpenTelemetry Requests Instrumentation to trace the underlying HTTP requests. + pip install opentelemetry-instrumentation-requests + +.. code-block:: python + + from opentelemetry import trace + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import BatchSpanProcessor + from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter + + tracer_provider = TracerProvider() + tracer_provider.add_span_processor(BatchSpanProcessor(CloudTraceSpanExporter())) + trace.set_tracer_provider(tracer_provider) + + # Optional yet recommended to instrument the requests HTTP library + from opentelemetry.instrumentation.requests import RequestsInstrumentor + RequestsInstrumentor().instrument(tracer_provider=tracer_provider) + +In this example, tracing data will be published to the `Google Cloud Trace`_ console. +Tracing is most effective when many libraries are instrumented to provide insight over the entire lifespan of a request. +For a list of libraries that can be instrumented, refer to the `OpenTelemetry Registry`_. + +.. _OpenTelemetry: https://opentelemetry.io +.. _OpenTelemetry Registry: https://opentelemetry.io/ecosystem/registry +.. _Google Cloud Trace: https://cloud.google.com/trace + + Next Steps ~~~~~~~~~~ diff --git a/google/cloud/storage/_http.py b/google/cloud/storage/_http.py index b4e16ebe4..aea13cc57 100644 --- a/google/cloud/storage/_http.py +++ b/google/cloud/storage/_http.py @@ -18,6 +18,7 @@ from google.cloud import _http from google.cloud.storage import __version__ from google.cloud.storage import _helpers +from google.cloud.storage._opentelemetry_tracing import create_trace_span class Connection(_http.JSONConnection): @@ -65,14 +66,25 @@ def __init__(self, client, client_info=None, api_endpoint=None): def api_request(self, *args, **kwargs): retry = kwargs.pop("retry", None) - kwargs["extra_api_info"] = _helpers._get_invocation_id() + invocation_id = _helpers._get_invocation_id() + kwargs["extra_api_info"] = invocation_id + span_attributes = { + "gccl-invocation-id": invocation_id, + } call = functools.partial(super(Connection, self).api_request, *args, **kwargs) - if retry: - # If this is a ConditionalRetryPolicy, check conditions. - try: - retry = retry.get_retry_policy_if_conditions_met(**kwargs) - except AttributeError: # This is not a ConditionalRetryPolicy. - pass + with create_trace_span( + name="Storage.Connection.api_request", + attributes=span_attributes, + client=self._client, + api_request=kwargs, + retry=retry, + ): if retry: - call = retry(call) - return call() + # If this is a ConditionalRetryPolicy, check conditions. + try: + retry = retry.get_retry_policy_if_conditions_met(**kwargs) + except AttributeError: # This is not a ConditionalRetryPolicy. + pass + if retry: + call = retry(call) + return call() diff --git a/google/cloud/storage/_opentelemetry_tracing.py b/google/cloud/storage/_opentelemetry_tracing.py new file mode 100644 index 000000000..ac4c43e07 --- /dev/null +++ b/google/cloud/storage/_opentelemetry_tracing.py @@ -0,0 +1,112 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Manages OpenTelemetry tracing span creation and handling. This is a PREVIEW FEATURE: Coverage and functionality may change.""" + +import logging +import os + +from contextlib import contextmanager + +from google.api_core import exceptions as api_exceptions +from google.api_core import retry as api_retry +from google.cloud.storage import __version__ +from google.cloud.storage.retry import ConditionalRetryPolicy + + +ENABLE_OTEL_TRACES_ENV_VAR = "ENABLE_GCS_PYTHON_CLIENT_OTEL_TRACES" +_DEFAULT_ENABLE_OTEL_TRACES_VALUE = False + +enable_otel_traces = os.environ.get( + ENABLE_OTEL_TRACES_ENV_VAR, _DEFAULT_ENABLE_OTEL_TRACES_VALUE +) +logger = logging.getLogger(__name__) + +try: + from opentelemetry import trace + + HAS_OPENTELEMETRY = True + +except ImportError: + logger.debug( + "This service is instrumented using OpenTelemetry. " + "OpenTelemetry or one of its components could not be imported; " + "please add compatible versions of opentelemetry-api and " + "opentelemetry-instrumentation packages in order to get Storage " + "Tracing data." + ) + HAS_OPENTELEMETRY = False + +_default_attributes = { + "rpc.service": "CloudStorage", + "rpc.system": "http", + "user_agent.original": f"gcloud-python/{__version__}", +} + + +@contextmanager +def create_trace_span(name, attributes=None, client=None, api_request=None, retry=None): + """Creates a context manager for a new span and set it as the current span + in the configured tracer. If no configuration exists yields None.""" + if not HAS_OPENTELEMETRY or not enable_otel_traces: + yield None + return + + tracer = trace.get_tracer(__name__) + final_attributes = _get_final_attributes(attributes, client, api_request, retry) + # Yield new span. + with tracer.start_as_current_span( + name=name, kind=trace.SpanKind.CLIENT, attributes=final_attributes + ) as span: + try: + yield span + except api_exceptions.GoogleAPICallError as error: + span.set_status(trace.Status(trace.StatusCode.ERROR)) + span.record_exception(error) + raise + + +def _get_final_attributes(attributes=None, client=None, api_request=None, retry=None): + collected_attr = _default_attributes.copy() + if api_request: + collected_attr.update(_set_api_request_attr(api_request, client)) + if isinstance(retry, api_retry.Retry): + collected_attr.update(_set_retry_attr(retry)) + if isinstance(retry, ConditionalRetryPolicy): + collected_attr.update( + _set_retry_attr(retry.retry_policy, retry.conditional_predicate) + ) + if attributes: + collected_attr.update(attributes) + final_attributes = {k: v for k, v in collected_attr.items() if v is not None} + return final_attributes + + +def _set_api_request_attr(request, client): + attr = {} + if request.get("method"): + attr["http.request.method"] = request.get("method") + if request.get("path"): + path = request.get("path") + full_path = f"{client._connection.API_BASE_URL}{path}" + attr["url.full"] = full_path + if request.get("timeout"): + attr["connect_timeout,read_timeout"] = request.get("timeout") + return attr + + +def _set_retry_attr(retry, conditional_predicate=None): + predicate = conditional_predicate if conditional_predicate else retry._predicate + retry_info = f"multiplier{retry._multiplier}/deadline{retry._deadline}/max{retry._maximum}/initial{retry._initial}/predicate{predicate}" + return {"retry": retry_info} diff --git a/google/cloud/storage/acl.py b/google/cloud/storage/acl.py index 1ca78f258..d20ca135b 100644 --- a/google/cloud/storage/acl.py +++ b/google/cloud/storage/acl.py @@ -15,6 +15,7 @@ """Manage access to objects and buckets.""" from google.cloud.storage._helpers import _add_generation_match_parameters +from google.cloud.storage._opentelemetry_tracing import create_trace_span from google.cloud.storage.constants import _DEFAULT_TIMEOUT from google.cloud.storage.retry import DEFAULT_RETRY from google.cloud.storage.retry import DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED @@ -359,6 +360,7 @@ def _require_client(self, client): client = self.client return client + @create_trace_span(name="Storage.ACL.reload") def reload(self, client=None, timeout=_DEFAULT_TIMEOUT, retry=DEFAULT_RETRY): """Reload the ACL data from Cloud Storage. @@ -484,6 +486,7 @@ def _save( self.loaded = True + @create_trace_span(name="Storage.ACL.save") def save( self, acl=None, @@ -552,6 +555,7 @@ def save( retry=retry, ) + @create_trace_span(name="Storage.ACL.savePredefined") def save_predefined( self, predefined, @@ -617,6 +621,7 @@ def save_predefined( retry=retry, ) + @create_trace_span(name="Storage.ACL.clear") def clear( self, client=None, diff --git a/google/cloud/storage/blob.py b/google/cloud/storage/blob.py index b0b4a663f..e474f1681 100644 --- a/google/cloud/storage/blob.py +++ b/google/cloud/storage/blob.py @@ -63,6 +63,7 @@ from google.cloud.storage._helpers import _NUM_RETRIES_MESSAGE from google.cloud.storage._helpers import _API_VERSION from google.cloud.storage._helpers import _virtual_hosted_style_base_url +from google.cloud.storage._opentelemetry_tracing import create_trace_span from google.cloud.storage.acl import ACL from google.cloud.storage.acl import ObjectACL from google.cloud.storage.constants import _DEFAULT_TIMEOUT @@ -639,6 +640,7 @@ def generate_signed_url( access_token=access_token, ) + @create_trace_span(name="Storage.Blob.exists") def exists( self, client=None, @@ -744,6 +746,7 @@ def exists( return False return True + @create_trace_span(name="Storage.Blob.delete") def delete( self, client=None, @@ -1005,11 +1008,21 @@ def _do_download( retry_strategy = _api_core_retry_to_resumable_media_retry(retry) + extra_attributes = { + "url.full": download_url, + "download.chunk_size": f"{self.chunk_size}", + "download.raw_download": raw_download, + "upload.checksum": f"{checksum}", + } + args = {"timeout": timeout} + if self.chunk_size is None: if raw_download: klass = RawDownload + download_class = "RawDownload" else: klass = Download + download_class = "Download" download = klass( download_url, @@ -1020,8 +1033,13 @@ def _do_download( checksum=checksum, ) download._retry_strategy = retry_strategy - response = download.consume(transport, timeout=timeout) - self._extract_headers_from_download(response) + with create_trace_span( + name=f"Storage.{download_class}/consume", + attributes=extra_attributes, + api_request=args, + ): + response = download.consume(transport, timeout=timeout) + self._extract_headers_from_download(response) else: if checksum: msg = _CHUNKED_DOWNLOAD_CHECKSUM_MESSAGE.format(checksum) @@ -1029,8 +1047,10 @@ def _do_download( if raw_download: klass = RawChunkedDownload + download_class = "RawChunkedDownload" else: klass = ChunkedDownload + download_class = "ChunkedDownload" download = klass( download_url, @@ -1042,9 +1062,15 @@ def _do_download( ) download._retry_strategy = retry_strategy - while not download.finished: - download.consume_next_chunk(transport, timeout=timeout) - + with create_trace_span( + name=f"Storage.{download_class}/consumeNextChunk", + attributes=extra_attributes, + api_request=args, + ): + while not download.finished: + download.consume_next_chunk(transport, timeout=timeout) + + @create_trace_span(name="Storage.Blob.downloadToFile") def download_to_file( self, file_obj, @@ -1207,6 +1233,7 @@ def _handle_filename_and_download(self, filename, *args, **kwargs): mtime = updated.timestamp() os.utime(file_obj.name, (mtime, mtime)) + @create_trace_span(name="Storage.Blob.downloadToFilename") def download_to_filename( self, filename, @@ -1332,6 +1359,7 @@ def download_to_filename( retry=retry, ) + @create_trace_span(name="Storage.Blob.downloadAsBytes") def download_as_bytes( self, client=None, @@ -1456,6 +1484,7 @@ def download_as_bytes( ) return string_buffer.getvalue() + @create_trace_span(name="Storage.Blob.downloadAsString") def download_as_string( self, client=None, @@ -1568,6 +1597,7 @@ def download_as_string( retry=retry, ) + @create_trace_span(name="Storage.Blob.downloadAsText") def download_as_text( self, client=None, @@ -1959,11 +1989,22 @@ def _do_multipart_upload( retry, num_retries ) - response = upload.transmit( - transport, data, object_metadata, content_type, timeout=timeout - ) + extra_attributes = { + "url.full": upload_url, + "upload.checksum": f"{checksum}", + } + args = {"timeout": timeout} + with create_trace_span( + name="Storage.MultipartUpload/transmit", + attributes=extra_attributes, + client=client, + api_request=args, + ): + response = upload.transmit( + transport, data, object_metadata, content_type, timeout=timeout + ) - return response + return response def _initiate_resumable_upload( self, @@ -2297,14 +2338,27 @@ def _do_resumable_upload( retry=retry, command=command, ) - while not upload.finished: - try: - response = upload.transmit_next_chunk(transport, timeout=timeout) - except resumable_media.DataCorruption: - # Attempt to delete the corrupted object. - self.delete() - raise - return response + extra_attributes = { + "url.full": upload.resumable_url, + "upload.chunk_size": upload.chunk_size, + "upload.checksum": f"{checksum}", + } + args = {"timeout": timeout} + # import pdb; pdb.set_trace() + with create_trace_span( + name="Storage.ResumableUpload/transmitNextChunk", + attributes=extra_attributes, + client=client, + api_request=args, + ): + while not upload.finished: + try: + response = upload.transmit_next_chunk(transport, timeout=timeout) + except resumable_media.DataCorruption: + # Attempt to delete the corrupted object. + self.delete() + raise + return response def _do_upload( self, @@ -2660,6 +2714,7 @@ def _prep_and_do_upload( except resumable_media.InvalidResponse as exc: _raise_from_invalid_response(exc) + @create_trace_span(name="Storage.Blob.uploadFromFile") def upload_from_file( self, file_obj, @@ -2831,6 +2886,7 @@ def _handle_filename_and_upload(self, filename, content_type=None, *args, **kwar **kwargs, ) + @create_trace_span(name="Storage.Blob.uploadFromFilename") def upload_from_filename( self, filename, @@ -2959,6 +3015,7 @@ def upload_from_filename( retry=retry, ) + @create_trace_span(name="Storage.Blob.uploadFromString") def upload_from_string( self, data, @@ -3081,6 +3138,7 @@ def upload_from_string( retry=retry, ) + @create_trace_span(name="Storage.Blob.createResumableUploadSession") def create_resumable_upload_session( self, content_type=None, @@ -3254,6 +3312,7 @@ def create_resumable_upload_session( except resumable_media.InvalidResponse as exc: _raise_from_invalid_response(exc) + @create_trace_span(name="Storage.Blob.getIamPolicy") def get_iam_policy( self, client=None, @@ -3322,6 +3381,7 @@ def get_iam_policy( ) return Policy.from_api_repr(info) + @create_trace_span(name="Storage.Blob.setIamPolicy") def set_iam_policy( self, policy, @@ -3383,6 +3443,7 @@ def set_iam_policy( ) return Policy.from_api_repr(info) + @create_trace_span(name="Storage.Blob.testIamPermissions") def test_iam_permissions( self, permissions, client=None, timeout=_DEFAULT_TIMEOUT, retry=DEFAULT_RETRY ): @@ -3437,6 +3498,7 @@ def test_iam_permissions( return resp.get("permissions", []) + @create_trace_span(name="Storage.Blob.makePublic") def make_public( self, client=None, @@ -3490,6 +3552,7 @@ def make_public( retry=retry, ) + @create_trace_span(name="Storage.Blob.makePrivate") def make_private( self, client=None, @@ -3543,6 +3606,7 @@ def make_private( retry=retry, ) + @create_trace_span(name="Storage.Blob.compose") def compose( self, sources, @@ -3682,6 +3746,7 @@ def compose( ) self._set_properties(api_response) + @create_trace_span(name="Storage.Blob.rewrite") def rewrite( self, source, @@ -3846,6 +3911,7 @@ def rewrite( return api_response["rewriteToken"], rewritten, size + @create_trace_span(name="Storage.Blob.updateStorageClass") def update_storage_class( self, new_class, @@ -3979,6 +4045,7 @@ def update_storage_class( retry=retry, ) + @create_trace_span(name="Storage.Blob.open") def open( self, mode="r", diff --git a/google/cloud/storage/bucket.py b/google/cloud/storage/bucket.py index 7b6421d29..ad1d0de5d 100644 --- a/google/cloud/storage/bucket.py +++ b/google/cloud/storage/bucket.py @@ -38,6 +38,7 @@ from google.cloud.storage._signing import generate_signed_url_v4 from google.cloud.storage._helpers import _bucket_bound_hostname_url from google.cloud.storage._helpers import _virtual_hosted_style_base_url +from google.cloud.storage._opentelemetry_tracing import create_trace_span from google.cloud.storage.acl import BucketACL from google.cloud.storage.acl import DefaultObjectACL from google.cloud.storage.blob import Blob @@ -827,6 +828,7 @@ def notification( notification_id=notification_id, ) + @create_trace_span(name="Storage.Bucket.exists") def exists( self, client=None, @@ -911,6 +913,7 @@ def exists( return False return True + @create_trace_span(name="Storage.Bucket.create") def create( self, client=None, @@ -986,6 +989,7 @@ def create( retry=retry, ) + @create_trace_span(name="Storage.Bucket.update") def update( self, client=None, @@ -1030,6 +1034,7 @@ def update( retry=retry, ) + @create_trace_span(name="Storage.Bucket.reload") def reload( self, client=None, @@ -1091,6 +1096,7 @@ def reload( retry=retry, ) + @create_trace_span(name="Storage.Bucket.patch") def patch( self, client=None, @@ -1174,6 +1180,7 @@ def path(self): return self.path_helper(self.name) + @create_trace_span(name="Storage.Bucket.getBlob") def get_blob( self, blob_name, @@ -1290,6 +1297,7 @@ def get_blob( else: return blob + @create_trace_span(name="Storage.Bucket.listBlobs") def list_blobs( self, max_results=None, @@ -1432,6 +1440,7 @@ def list_blobs( soft_deleted=soft_deleted, ) + @create_trace_span(name="Storage.Bucket.listNotifications") def list_notifications( self, client=None, timeout=_DEFAULT_TIMEOUT, retry=DEFAULT_RETRY ): @@ -1469,6 +1478,7 @@ def list_notifications( iterator.bucket = self return iterator + @create_trace_span(name="Storage.Bucket.getNotification") def get_notification( self, notification_id, @@ -1506,6 +1516,7 @@ def get_notification( notification.reload(client=client, timeout=timeout, retry=retry) return notification + @create_trace_span(name="Storage.Bucket.delete") def delete( self, force=False, @@ -1612,6 +1623,7 @@ def delete( _target_object=None, ) + @create_trace_span(name="Storage.Bucket.deleteBlob") def delete_blob( self, blob_name, @@ -1698,6 +1710,7 @@ def delete_blob( _target_object=None, ) + @create_trace_span(name="Storage.Bucket.deleteBlobs") def delete_blobs( self, blobs, @@ -1818,6 +1831,7 @@ def delete_blobs( else: raise + @create_trace_span(name="Storage.Bucket.copyBlob") def copy_blob( self, blob, @@ -1973,6 +1987,7 @@ def copy_blob( new_blob._set_properties(copy_result) return new_blob + @create_trace_span(name="Storage.Bucket.renameBlob") def rename_blob( self, blob, @@ -2116,6 +2131,7 @@ def rename_blob( ) return new_blob + @create_trace_span(name="Storage.Bucket.restore_blob") def restore_blob( self, blob_name, @@ -3017,6 +3033,7 @@ def disable_website(self): """ return self.configure_website(None, None) + @create_trace_span(name="Storage.Bucket.getIamPolicy") def get_iam_policy( self, client=None, @@ -3079,6 +3096,7 @@ def get_iam_policy( ) return Policy.from_api_repr(info) + @create_trace_span(name="Storage.Bucket.setIamPolicy") def set_iam_policy( self, policy, @@ -3135,6 +3153,7 @@ def set_iam_policy( return Policy.from_api_repr(info) + @create_trace_span(name="Storage.Bucket.testIamPermissions") def test_iam_permissions( self, permissions, client=None, timeout=_DEFAULT_TIMEOUT, retry=DEFAULT_RETRY ): @@ -3182,6 +3201,7 @@ def test_iam_permissions( ) return resp.get("permissions", []) + @create_trace_span(name="Storage.Bucket.makePublic") def make_public( self, recursive=False, @@ -3279,6 +3299,7 @@ def make_public( timeout=timeout, ) + @create_trace_span(name="Storage.Bucket.makePrivate") def make_private( self, recursive=False, @@ -3426,6 +3447,7 @@ def generate_upload_policy(self, conditions, expiration=None, client=None): return fields + @create_trace_span(name="Storage.Bucket.lockRetentionPolicy") def lock_retention_policy( self, client=None, timeout=_DEFAULT_TIMEOUT, retry=DEFAULT_RETRY ): diff --git a/google/cloud/storage/client.py b/google/cloud/storage/client.py index 57bbab008..b21ef7cef 100644 --- a/google/cloud/storage/client.py +++ b/google/cloud/storage/client.py @@ -41,6 +41,7 @@ from google.cloud.storage._helpers import _STORAGE_HOST_TEMPLATE from google.cloud.storage._helpers import _NOW from google.cloud.storage._helpers import _UTC +from google.cloud.storage._opentelemetry_tracing import create_trace_span from google.cloud.storage._http import Connection from google.cloud.storage._signing import ( @@ -337,6 +338,7 @@ def current_batch(self): """ return self._batch_stack.top + @create_trace_span(name="Storage.Client.getServiceAccountEmail") def get_service_account_email( self, project=None, timeout=_DEFAULT_TIMEOUT, retry=DEFAULT_RETRY ): @@ -481,9 +483,20 @@ def _list_resource( timeout=_DEFAULT_TIMEOUT, retry=DEFAULT_RETRY, ): - api_request = functools.partial( - self._connection.api_request, timeout=timeout, retry=retry - ) + kwargs = { + "method": "GET", + "path": path, + "timeout": timeout, + } + with create_trace_span( + name="Storage.Client._list_resource_returns_iterator", + client=self, + api_request=kwargs, + retry=retry, + ): + api_request = functools.partial( + self._connection.api_request, timeout=timeout, retry=retry + ) return page_iterator.HTTPIterator( client=self, api_request=api_request, @@ -798,6 +811,7 @@ def _bucket_arg_to_bucket(self, bucket_or_name): bucket = Bucket(self, name=bucket_or_name) return bucket + @create_trace_span(name="Storage.Client.getBucket") def get_bucket( self, bucket_or_name, @@ -863,6 +877,7 @@ def get_bucket( ) return bucket + @create_trace_span(name="Storage.Client.lookupBucket") def lookup_bucket( self, bucket_name, @@ -910,6 +925,7 @@ def lookup_bucket( except NotFound: return None + @create_trace_span(name="Storage.Client.createBucket") def create_bucket( self, bucket_or_name, @@ -1053,6 +1069,7 @@ def create_bucket( bucket._set_properties(api_response) return bucket + @create_trace_span(name="Storage.Client.downloadBlobToFile") def download_blob_to_file( self, blob_or_uri, @@ -1167,6 +1184,7 @@ def download_blob_to_file( retry=retry, ) + @create_trace_span(name="Storage.Client.listBlobs") def list_blobs( self, bucket_or_name, @@ -1356,6 +1374,7 @@ def list_blobs( iterator.prefixes = set() return iterator + @create_trace_span(name="Storage.Client.listBuckets") def list_buckets( self, max_results=None, @@ -1461,6 +1480,7 @@ def list_buckets( retry=retry, ) + @create_trace_span(name="Storage.Client.createHmacKey") def create_hmac_key( self, service_account_email, @@ -1525,6 +1545,7 @@ def create_hmac_key( secret = api_response["secret"] return metadata, secret + @create_trace_span(name="Storage.Client.listHmacKeys") def list_hmac_keys( self, max_results=None, @@ -1594,6 +1615,7 @@ def list_hmac_keys( retry=retry, ) + @create_trace_span(name="Storage.Client.getHmacKeyMetadata") def get_hmac_key_metadata( self, access_id, project_id=None, user_project=None, timeout=_DEFAULT_TIMEOUT ): diff --git a/google/cloud/storage/hmac_key.py b/google/cloud/storage/hmac_key.py index 41f513ec6..d37bc071b 100644 --- a/google/cloud/storage/hmac_key.py +++ b/google/cloud/storage/hmac_key.py @@ -20,6 +20,7 @@ from google.cloud.exceptions import NotFound from google.cloud._helpers import _rfc3339_nanos_to_datetime +from google.cloud.storage._opentelemetry_tracing import create_trace_span from google.cloud.storage.constants import _DEFAULT_TIMEOUT from google.cloud.storage.retry import DEFAULT_RETRY from google.cloud.storage.retry import DEFAULT_RETRY_IF_ETAG_IN_JSON @@ -187,6 +188,7 @@ def user_project(self): """ return self._user_project + @create_trace_span(name="Storage.HmacKey.exists") def exists(self, timeout=_DEFAULT_TIMEOUT, retry=DEFAULT_RETRY): """Determine whether or not the key for this metadata exists. @@ -219,6 +221,7 @@ def exists(self, timeout=_DEFAULT_TIMEOUT, retry=DEFAULT_RETRY): else: return True + @create_trace_span(name="Storage.HmacKey.reload") def reload(self, timeout=_DEFAULT_TIMEOUT, retry=DEFAULT_RETRY): """Reload properties from Cloud Storage. @@ -246,6 +249,7 @@ def reload(self, timeout=_DEFAULT_TIMEOUT, retry=DEFAULT_RETRY): retry=retry, ) + @create_trace_span(name="Storage.HmacKey.update") def update(self, timeout=_DEFAULT_TIMEOUT, retry=DEFAULT_RETRY_IF_ETAG_IN_JSON): """Save writable properties to Cloud Storage. @@ -274,6 +278,7 @@ def update(self, timeout=_DEFAULT_TIMEOUT, retry=DEFAULT_RETRY_IF_ETAG_IN_JSON): retry=retry, ) + @create_trace_span(name="Storage.HmacKey.delete") def delete(self, timeout=_DEFAULT_TIMEOUT, retry=DEFAULT_RETRY): """Delete the key from Cloud Storage. diff --git a/google/cloud/storage/notification.py b/google/cloud/storage/notification.py index 9af476d58..d9d49fc4b 100644 --- a/google/cloud/storage/notification.py +++ b/google/cloud/storage/notification.py @@ -21,6 +21,7 @@ from google.api_core.exceptions import NotFound +from google.cloud.storage._opentelemetry_tracing import create_trace_span from google.cloud.storage.constants import _DEFAULT_TIMEOUT from google.cloud.storage.retry import DEFAULT_RETRY @@ -230,6 +231,7 @@ def _set_properties(self, response): self._properties.clear() self._properties.update(response) + @create_trace_span(name="Storage.BucketNotification.create") def create(self, client=None, timeout=_DEFAULT_TIMEOUT, retry=None): """API wrapper: create the notification. @@ -282,6 +284,7 @@ def create(self, client=None, timeout=_DEFAULT_TIMEOUT, retry=None): retry=retry, ) + @create_trace_span(name="Storage.BucketNotification.exists") def exists(self, client=None, timeout=_DEFAULT_TIMEOUT, retry=DEFAULT_RETRY): """Test whether this notification exists. @@ -329,6 +332,7 @@ def exists(self, client=None, timeout=_DEFAULT_TIMEOUT, retry=DEFAULT_RETRY): else: return True + @create_trace_span(name="Storage.BucketNotification.reload") def reload(self, client=None, timeout=_DEFAULT_TIMEOUT, retry=DEFAULT_RETRY): """Update this notification from the server configuration. @@ -371,6 +375,7 @@ def reload(self, client=None, timeout=_DEFAULT_TIMEOUT, retry=DEFAULT_RETRY): ) self._set_properties(response) + @create_trace_span(name="Storage.BucketNotification.delete") def delete(self, client=None, timeout=_DEFAULT_TIMEOUT, retry=DEFAULT_RETRY): """Delete this notification. diff --git a/noxfile.py b/noxfile.py index fb3d8f89e..319ae207e 100644 --- a/noxfile.py +++ b/noxfile.py @@ -75,12 +75,16 @@ def lint_setup_py(session): session.run("python", "setup.py", "check", "--restructuredtext", "--strict") -def default(session): +def default(session, install_extras=True): constraints_path = str( CURRENT_DIRECTORY / "testing" / f"constraints-{session.python}.txt" ) # Install all test dependencies, then install this package in-place. session.install("mock", "pytest", "pytest-cov", "-c", constraints_path) + + if install_extras: + session.install("opentelemetry-api", "opentelemetry-sdk") + session.install("-e", ".", "-c", constraints_path) # Run py.test against the unit tests. diff --git a/setup.py b/setup.py index b2f5e411e..391bf7770 100644 --- a/setup.py +++ b/setup.py @@ -35,7 +35,12 @@ "requests >= 2.18.0, < 3.0.0dev", "google-crc32c >= 1.0, < 2.0dev", ] -extras = {"protobuf": ["protobuf<5.0.0dev"]} +extras = { + "protobuf": ["protobuf<5.0.0dev"], + "tracing": [ + "opentelemetry-api >= 1.1.0", + ], +} # Setup boilerplate below this line. diff --git a/tests/unit/test__opentelemetry_tracing.py b/tests/unit/test__opentelemetry_tracing.py new file mode 100644 index 000000000..631ac9f82 --- /dev/null +++ b/tests/unit/test__opentelemetry_tracing.py @@ -0,0 +1,223 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import importlib +import os +import pytest +import sys + +import mock +from google.api_core.exceptions import GoogleAPICallError +from google.cloud.storage import __version__ +from google.cloud.storage import _opentelemetry_tracing + + +@pytest.fixture +def setup(): + """Setup OTel packages and tracer provider.""" + try: + from opentelemetry import trace as trace_api + from opentelemetry.sdk.trace import TracerProvider, export + from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, + ) + except ImportError: # pragma: NO COVER + pytest.skip("This test suite requires OpenTelemetry pacakges.") + + tracer_provider = TracerProvider() + memory_exporter = InMemorySpanExporter() + span_processor = export.SimpleSpanProcessor(memory_exporter) + tracer_provider.add_span_processor(span_processor) + trace_api.set_tracer_provider(tracer_provider) + importlib.reload(_opentelemetry_tracing) + yield memory_exporter + + +@pytest.fixture() +def mock_os_environ(monkeypatch): + """Mock os.environ.""" + monkeypatch.setattr(os, "environ", {}) + return os.environ + + +@pytest.fixture() +def setup_optin(mock_os_environ): + """Mock envar to opt-in tracing for storage client.""" + mock_os_environ["ENABLE_GCS_PYTHON_CLIENT_OTEL_TRACES"] = True + importlib.reload(_opentelemetry_tracing) + + +def test_opentelemetry_not_installed(setup, monkeypatch): + monkeypatch.setitem(sys.modules, "opentelemetry", None) + importlib.reload(_opentelemetry_tracing) + # Test no-ops when OpenTelemetry is not installed. + with _opentelemetry_tracing.create_trace_span("No-ops w/o opentelemetry") as span: + assert span is None + assert not _opentelemetry_tracing.HAS_OPENTELEMETRY + + +def test_opentelemetry_no_trace_optin(setup): + assert _opentelemetry_tracing.HAS_OPENTELEMETRY + assert not _opentelemetry_tracing.enable_otel_traces + # Test no-ops when user has not opt-in. + # This prevents customers accidentally being billed for tracing. + with _opentelemetry_tracing.create_trace_span("No-ops w/o opt-in") as span: + assert span is None + + +def test_enable_trace_yield_span(setup, setup_optin): + assert _opentelemetry_tracing.HAS_OPENTELEMETRY + assert _opentelemetry_tracing.enable_otel_traces + with _opentelemetry_tracing.create_trace_span("No-ops for opentelemetry") as span: + assert span is not None + + +def test_enable_trace_call(setup, setup_optin): + from opentelemetry import trace as trace_api + + extra_attributes = { + "attribute1": "value1", + } + expected_attributes = { + "rpc.service": "CloudStorage", + "rpc.system": "http", + "user_agent.original": f"gcloud-python/{__version__}", + } + expected_attributes.update(extra_attributes) + + with _opentelemetry_tracing.create_trace_span( + "OtelTracing.Test", attributes=extra_attributes + ) as span: + span.set_attribute("after_setup_attribute", 1) + + expected_attributes["after_setup_attribute"] = 1 + + assert span.kind == trace_api.SpanKind.CLIENT + assert span.attributes == expected_attributes + assert span.name == "OtelTracing.Test" + + +def test_enable_trace_error(setup, setup_optin): + from opentelemetry import trace as trace_api + + extra_attributes = { + "attribute1": "value1", + } + expected_attributes = { + "rpc.service": "CloudStorage", + "rpc.system": "http", + "user_agent.original": f"gcloud-python/{__version__}", + } + expected_attributes.update(extra_attributes) + + with pytest.raises(GoogleAPICallError): + with _opentelemetry_tracing.create_trace_span( + "OtelTracing.Test", attributes=extra_attributes + ) as span: + from google.cloud.exceptions import NotFound + + assert span.kind == trace_api.SpanKind.CLIENT + assert span.attributes == expected_attributes + assert span.name == "OtelTracing.Test" + raise NotFound("Test catching NotFound error in trace span.") + + +def test_get_final_attributes(setup, setup_optin): + from google.api_core import retry as api_retry + + test_span_name = "OtelTracing.Test" + test_span_attributes = { + "foo": "bar", + } + api_request = { + "method": "GET", + "path": "/foo/bar/baz", + "timeout": (100, 100), + } + retry_obj = api_retry.Retry() + + expected_attributes = { + "foo": "bar", + "rpc.service": "CloudStorage", + "rpc.system": "http", + "user_agent.original": f"gcloud-python/{__version__}", + "http.request.method": "GET", + "url.full": "https://testOtel.org/foo/bar/baz", + "connect_timeout,read_timeout": (100, 100), + "retry": f"multiplier{retry_obj._multiplier}/deadline{retry_obj._deadline}/max{retry_obj._maximum}/initial{retry_obj._initial}/predicate{retry_obj._predicate}", + } + + with mock.patch("google.cloud.storage.client.Client") as test_client: + test_client.project = "test_project" + test_client._connection.API_BASE_URL = "https://testOtel.org" + with _opentelemetry_tracing.create_trace_span( + test_span_name, + attributes=test_span_attributes, + client=test_client, + api_request=api_request, + retry=retry_obj, + ) as span: + assert span is not None + assert span.name == test_span_name + assert span.attributes == expected_attributes + + +def test_set_conditional_retry_attr(setup, setup_optin): + from google.api_core import retry as api_retry + from google.cloud.storage.retry import ConditionalRetryPolicy + + test_span_name = "OtelTracing.Test" + retry_policy = api_retry.Retry() + conditional_predicate = mock.Mock() + required_kwargs = ("kwarg",) + retry_obj = ConditionalRetryPolicy( + retry_policy, conditional_predicate, required_kwargs + ) + + expected_attributes = { + "rpc.service": "CloudStorage", + "rpc.system": "http", + "user_agent.original": f"gcloud-python/{__version__}", + "retry": f"multiplier{retry_policy._multiplier}/deadline{retry_policy._deadline}/max{retry_policy._maximum}/initial{retry_policy._initial}/predicate{conditional_predicate}", + } + + with _opentelemetry_tracing.create_trace_span( + test_span_name, + retry=retry_obj, + ) as span: + assert span is not None + assert span.name == test_span_name + assert span.attributes == expected_attributes + + +def test_set_api_request_attr(): + from google.cloud.storage import Client + + test_client = Client() + args_method = {"method": "GET"} + expected_attributes = {"http.request.method": "GET"} + attr = _opentelemetry_tracing._set_api_request_attr(args_method, test_client) + assert attr == expected_attributes + + args_path = {"path": "/foo/bar/baz"} + expected_attributes = {"url.full": "https://storage.googleapis.com/foo/bar/baz"} + attr = _opentelemetry_tracing._set_api_request_attr(args_path, test_client) + assert attr == expected_attributes + + args_timeout = {"timeout": (100, 100)} + expected_attributes = { + "connect_timeout,read_timeout": (100, 100), + } + attr = _opentelemetry_tracing._set_api_request_attr(args_timeout, test_client) + assert attr == expected_attributes diff --git a/tests/unit/test_blob.py b/tests/unit/test_blob.py index 98d744d6c..b0ff4f07b 100644 --- a/tests/unit/test_blob.py +++ b/tests/unit/test_blob.py @@ -2176,7 +2176,7 @@ def test_download_as_string(self, mock_warn): retry=DEFAULT_RETRY, ) - mock_warn.assert_called_once_with( + mock_warn.assert_any_call( _DOWNLOAD_AS_STRING_DEPRECATED, PendingDeprecationWarning, stacklevel=2, @@ -2214,7 +2214,7 @@ def test_download_as_string_no_retry(self, mock_warn): retry=None, ) - mock_warn.assert_called_once_with( + mock_warn.assert_any_call( _DOWNLOAD_AS_STRING_DEPRECATED, PendingDeprecationWarning, stacklevel=2, @@ -3410,7 +3410,7 @@ def test_upload_from_file_w_num_retries(self, mock_warn): self._upload_from_file_helper(num_retries=2) - mock_warn.assert_called_once_with( + mock_warn.assert_any_call( _NUM_RETRIES_MESSAGE, DeprecationWarning, stacklevel=2, @@ -3425,7 +3425,7 @@ def test_upload_from_file_with_retry_conflict(self, mock_warn): # through. self._upload_from_file_helper(retry=DEFAULT_RETRY, num_retries=2) - mock_warn.assert_called_once_with( + mock_warn.assert_any_call( _NUM_RETRIES_MESSAGE, DeprecationWarning, stacklevel=2, @@ -3595,7 +3595,7 @@ def test_upload_from_filename_w_num_retries(self, mock_warn): self.assertEqual(stream.mode, "rb") self.assertEqual(stream.name, temp.name) - mock_warn.assert_called_once_with( + mock_warn.assert_any_call( _NUM_RETRIES_MESSAGE, DeprecationWarning, stacklevel=2, @@ -3686,7 +3686,7 @@ def test_upload_from_string_with_num_retries(self, mock_warn): data = "\N{snowman} \N{sailboat}" self._upload_from_string_helper(data, num_retries=2) - mock_warn.assert_called_once_with( + mock_warn.assert_any_call( _NUM_RETRIES_MESSAGE, DeprecationWarning, stacklevel=2, @@ -4642,7 +4642,7 @@ def test_compose_w_if_generation_match_list_w_warning(self, mock_warn): _target_object=destination, ) - mock_warn.assert_called_with( + mock_warn.assert_any_call( _COMPOSE_IF_GENERATION_LIST_DEPRECATED, DeprecationWarning, stacklevel=2, @@ -4672,7 +4672,7 @@ def test_compose_w_if_generation_match_and_if_s_generation_match(self, mock_warn client._post_resource.assert_not_called() - mock_warn.assert_called_with( + mock_warn.assert_any_call( _COMPOSE_IF_GENERATION_LIST_DEPRECATED, DeprecationWarning, stacklevel=2, @@ -4716,7 +4716,7 @@ def test_compose_w_if_metageneration_match_list_w_warning(self, mock_warn): _target_object=destination, ) - mock_warn.assert_called_with( + mock_warn.assert_any_call( _COMPOSE_IF_METAGENERATION_LIST_DEPRECATED, DeprecationWarning, stacklevel=2,