diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 10c648e517..cd729974a1 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -9,6 +9,89 @@ env: CORE_REPO_SHA: 955c92e91b5cd4bcfb43c39efcef086b040471d2 jobs: + build: + env: + # We use these variables to convert between tox and GHA version literals + py37: 3.7 + py38: 3.8 + py39: 3.9 + py310: "3.10" + py311: "3.11" + pypy3: "pypy3.7" + RUN_MATRIX_COMBINATION: ${{ matrix.python-version }}-${{ matrix.package }}-${{ matrix.os }} + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false # ensures the entire test matrix is run, even if one permutation fails + matrix: + python-version: [ py37, py38, py39, py310, py311, pypy3 ] + package: ["instrumentation", "distro", "exporter", "sdkextension", "propagator", "resource"] + os: [ ubuntu-20.04 ] + steps: + - name: Checkout Contrib Repo @ SHA - ${{ github.sha }} + uses: actions/checkout@v2 + - name: Set up Python ${{ env[matrix.python-version] }} + uses: actions/setup-python@v4 + with: + python-version: ${{ env[matrix.python-version] }} + - name: Install tox + run: pip install tox==3.27.1 tox-factor + - name: Cache tox environment + # Preserves .tox directory between runs for faster installs + uses: actions/cache@v1 + with: + path: | + .tox + ~/.cache/pip + key: v7-build-tox-cache-${{ env.RUN_MATRIX_COMBINATION }}-${{ hashFiles('tox.ini', 'gen-requirements.txt', 'dev-requirements.txt') }} + - name: run tox + run: tox -f ${{ matrix.python-version }}-${{ matrix.package }} -- --benchmark-json=${{ env.RUN_MATRIX_COMBINATION }}-benchmark.json + # - name: Find and merge ${{ matrix.package }} benchmarks + # # TODO: Add at least one benchmark to every package type to remove this (#249) + # if: matrix.package == 'sdkextension' || matrix.package == 'propagator' + # run: >- + # mkdir -p benchmarks; + # jq -s '.[0].benchmarks = ([.[].benchmarks] | add) + # | if .[0].benchmarks == null then null else .[0] end' + # **/**/tests/*${{ matrix.package }}*-benchmark.json > benchmarks/output_${{ matrix.package }}.json + # - name: Upload all benchmarks under same key as an artifact + # if: ${{ success() }} + # uses: actions/upload-artifact@v2 + # with: + # name: benchmarks + # path: benchmarks/output_${{ matrix.package }}.json + # combine-benchmarks: + # runs-on: ubuntu-latest + # needs: build + # if: ${{ always() }} + # name: Combine benchmarks from previous build job + # steps: + # - name: Checkout Contrib Repo @ SHA - ${{ github.sha }} + # uses: actions/checkout@v2 + # - name: Download all benchmarks as artifact using key + # uses: actions/download-artifact@v2 + # with: + # name: benchmarks + # path: benchmarks + # - name: Find and merge all benchmarks + # run: >- + # jq -s '.[0].benchmarks = ([.[].benchmarks] | add) + # | if .[0].benchmarks == null then null else .[0] end' + # benchmarks/output_*.json > output.json; + # - name: Report on benchmark results + # uses: benchmark-action/github-action-benchmark@v1 + # with: + # name: OpenTelemetry Python Benchmarks - Python ${{ env[matrix.python-version ]}} - ${{ matrix.package }} + # tool: pytest + # output-file-path: output.json + # github-token: ${{ secrets.GITHUB_TOKEN }} + # max-items-in-chart: 100 + # # Alert with a commit comment on possible performance regression + # alert-threshold: 200% + # fail-on-alert: true + # # Make a commit on `gh-pages` with benchmarks from previous step + # auto-push: ${{ github.ref == 'refs/heads/main' }} + # gh-pages-branch: gh-pages + # benchmark-data-dir-path: benchmarks misc: strategy: fail-fast: false @@ -24,7 +107,7 @@ jobs: with: python-version: "3.10" - name: Install tox - run: pip install tox + run: pip install tox==3.27.1 - name: Install libsnappy-dev if: ${{ matrix.tox-environment == 'lint' }} run: sudo apt-get install -y libsnappy-dev diff --git a/instrumentation/opentelemetry-instrumentation-asgi/src/opentelemetry/instrumentation/asgi/__init__.py b/instrumentation/opentelemetry-instrumentation-asgi/src/opentelemetry/instrumentation/asgi/__init__.py index 8e4d699cbb..fc80d9a5e5 100644 --- a/instrumentation/opentelemetry-instrumentation-asgi/src/opentelemetry/instrumentation/asgi/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-asgi/src/opentelemetry/instrumentation/asgi/__init__.py @@ -510,6 +510,16 @@ def __init__( unit="By", description="Measures the size of HTTP request messages (compressed).", ) + self.server_response_size_histogram = self.meter.create_histogram( + name=MetricInstruments.HTTP_SERVER_RESPONSE_SIZE, + unit="By", + description="measures the size of HTTP response messages (compressed).", + ) + self.server_request_size_histogram = self.meter.create_histogram( + name=MetricInstruments.HTTP_SERVER_REQUEST_SIZE, + unit="By", + description="Measures the size of HTTP request messages (compressed).", + ) self.active_requests_counter = self.meter.create_up_down_counter( name=MetricInstruments.HTTP_SERVER_ACTIVE_REQUESTS, unit="requests", diff --git a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/__init__.py b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/__init__.py index 58ef87c11c..b1f1f33fb8 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/__init__.py @@ -234,6 +234,7 @@ def _patched_api_call(self, original_func, instance, args, kwargs): attributes["rpc.request.payload"] = limit_string_size(self.payload_size_limit,json.dumps(body, default=str)) elif call_context.service == "events" and call_context.operation == "PutEvents": call_context.span_kind = SpanKind.PRODUCER + attributes["rpc.request.payload"] = limit_string_size(self.payload_size_limit, json.dumps(call_context.params, default=str)) else: attributes["rpc.request.payload"] = limit_string_size(self.payload_size_limit, json.dumps(call_context.params, default=str)) except Exception as ex: @@ -483,7 +484,7 @@ def set( val = {"DataType": "String", "StringValue": value} carrier[key] = val -def limit_string_size(s: str, max_size: int) -> str: +def limit_string_size(max_size: int, s: str) -> str: if len(s) > max_size: return s[:max_size] else: diff --git a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/tests/__init__.py b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/tests/test_botocore_dynamodb.py b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/tests/test_botocore_dynamodb.py new file mode 100644 index 0000000000..1b7f5bb0cb --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/tests/test_botocore_dynamodb.py @@ -0,0 +1,506 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +from unittest import mock + +import botocore.session +from moto import mock_dynamodb2 # pylint: disable=import-error + +from opentelemetry.instrumentation.botocore import BotocoreInstrumentor +from opentelemetry.instrumentation.botocore.extensions.dynamodb import ( + _DynamoDbExtension, +) +from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.test.test_base import TestBase +from opentelemetry.trace.span import Span + +# pylint: disable=too-many-public-methods + + +class TestDynamoDbExtension(TestBase): + def setUp(self): + super().setUp() + BotocoreInstrumentor().instrument() + + session = botocore.session.get_session() + session.set_credentials( + access_key="access-key", secret_key="secret-key" + ) + self.client = session.create_client( + "dynamodb", region_name="us-west-2" + ) + self.default_table_name = "test_table" + + def tearDown(self): + super().tearDown() + BotocoreInstrumentor().uninstrument() + + def _create_table(self, **kwargs): + create_args = { + "TableName": self.default_table_name, + "AttributeDefinitions": [ + {"AttributeName": "id", "AttributeType": "S"}, + {"AttributeName": "idl", "AttributeType": "S"}, + {"AttributeName": "idg", "AttributeType": "S"}, + ], + "KeySchema": [{"AttributeName": "id", "KeyType": "HASH"}], + "ProvisionedThroughput": { + "ReadCapacityUnits": 5, + "WriteCapacityUnits": 5, + }, + "LocalSecondaryIndexes": [ + { + "IndexName": "lsi", + "KeySchema": [{"AttributeName": "idl", "KeyType": "HASH"}], + "Projection": {"ProjectionType": "KEYS_ONLY"}, + } + ], + "GlobalSecondaryIndexes": [ + { + "IndexName": "gsi", + "KeySchema": [{"AttributeName": "idg", "KeyType": "HASH"}], + "Projection": {"ProjectionType": "KEYS_ONLY"}, + } + ], + } + create_args.update(kwargs) + + self.client.create_table(**create_args) + + def _create_prepared_table(self, **kwargs): + self._create_table(**kwargs) + + table = kwargs.get("TableName", self.default_table_name) + self.client.put_item( + TableName=table, + Item={"id": {"S": "1"}, "idl": {"S": "2"}, "idg": {"S": "3"}}, + ) + + self.memory_exporter.clear() + + @staticmethod + def _create_extension(operation: str) -> _DynamoDbExtension: + call_context = mock.MagicMock(operation=operation) + return _DynamoDbExtension(call_context) + + def assert_span(self, operation: str) -> Span: + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(1, len(spans)) + span = spans[0] + + self.assertEqual("dynamodb", span.attributes[SpanAttributes.DB_SYSTEM]) + self.assertEqual( + operation, span.attributes[SpanAttributes.DB_OPERATION] + ) + self.assertEqual( + "dynamodb.us-west-2.amazonaws.com", + span.attributes[SpanAttributes.NET_PEER_NAME], + ) + return span + + def assert_table_names(self, span: Span, *table_names): + self.assertEqual( + tuple(table_names), + span.attributes[SpanAttributes.AWS_DYNAMODB_TABLE_NAMES], + ) + + def assert_consumed_capacity(self, span: Span, *table_names): + cap = span.attributes[SpanAttributes.AWS_DYNAMODB_CONSUMED_CAPACITY] + self.assertEqual(len(cap), len(table_names)) + cap_tables = set() + for item in cap: + # should be like {"TableName": name, "CapacityUnits": number, ...} + deserialized = json.loads(item) + cap_tables.add(deserialized["TableName"]) + for table_name in table_names: + self.assertIn(table_name, cap_tables) + + def assert_item_col_metrics(self, span: Span): + actual = span.attributes[ + SpanAttributes.AWS_DYNAMODB_ITEM_COLLECTION_METRICS + ] + self.assertIsNotNone(actual) + json.loads(actual) + + def assert_provisioned_read_cap(self, span: Span, expected: int): + actual = span.attributes[ + SpanAttributes.AWS_DYNAMODB_PROVISIONED_READ_CAPACITY + ] + self.assertEqual(expected, actual) + + def assert_provisioned_write_cap(self, span: Span, expected: int): + actual = span.attributes[ + SpanAttributes.AWS_DYNAMODB_PROVISIONED_WRITE_CAPACITY + ] + self.assertEqual(expected, actual) + + def assert_consistent_read(self, span: Span, expected: bool): + actual = span.attributes[SpanAttributes.AWS_DYNAMODB_CONSISTENT_READ] + self.assertEqual(expected, actual) + + def assert_projection(self, span: Span, expected: str): + actual = span.attributes[SpanAttributes.AWS_DYNAMODB_PROJECTION] + self.assertEqual(expected, actual) + + def assert_attributes_to_get(self, span: Span, *attrs): + self.assertEqual( + tuple(attrs), + span.attributes[SpanAttributes.AWS_DYNAMODB_ATTRIBUTES_TO_GET], + ) + + def assert_index_name(self, span: Span, expected: str): + self.assertEqual( + expected, span.attributes[SpanAttributes.AWS_DYNAMODB_INDEX_NAME] + ) + + def assert_limit(self, span: Span, expected: int): + self.assertEqual( + expected, span.attributes[SpanAttributes.AWS_DYNAMODB_LIMIT] + ) + + def assert_select(self, span: Span, expected: str): + self.assertEqual( + expected, span.attributes[SpanAttributes.AWS_DYNAMODB_SELECT] + ) + + def assert_extension_item_col_metrics(self, operation: str): + span = self.tracer_provider.get_tracer("test").start_span("test") + extension = self._create_extension(operation) + + extension.on_success( + span, {"ItemCollectionMetrics": {"ItemCollectionKey": {"id": "1"}}} + ) + self.assert_item_col_metrics(span) + + @mock_dynamodb2 + def test_batch_get_item(self): + table_name1 = "test_table1" + table_name2 = "test_table2" + self._create_prepared_table(TableName=table_name1) + self._create_prepared_table(TableName=table_name2) + + self.client.batch_get_item( + RequestItems={ + table_name1: {"Keys": [{"id": {"S": "test_key"}}]}, + table_name2: {"Keys": [{"id": {"S": "test_key2"}}]}, + }, + ReturnConsumedCapacity="TOTAL", + ) + + span = self.assert_span("BatchGetItem") + self.assert_table_names(span, table_name1, table_name2) + self.assert_consumed_capacity(span, table_name1, table_name2) + + @mock_dynamodb2 + def test_batch_write_item(self): + table_name1 = "test_table1" + table_name2 = "test_table2" + self._create_prepared_table(TableName=table_name1) + self._create_prepared_table(TableName=table_name2) + + self.client.batch_write_item( + RequestItems={ + table_name1: [{"PutRequest": {"Item": {"id": {"S": "123"}}}}], + table_name2: [{"PutRequest": {"Item": {"id": {"S": "456"}}}}], + }, + ReturnConsumedCapacity="TOTAL", + ReturnItemCollectionMetrics="SIZE", + ) + + span = self.assert_span("BatchWriteItem") + self.assert_table_names(span, table_name1, table_name2) + self.assert_consumed_capacity(span, table_name1, table_name2) + self.assert_item_col_metrics(span) + + @mock_dynamodb2 + def test_create_table(self): + local_sec_idx = { + "IndexName": "local_sec_idx", + "KeySchema": [{"AttributeName": "value", "KeyType": "HASH"}], + "Projection": {"ProjectionType": "KEYS_ONLY"}, + } + global_sec_idx = { + "IndexName": "global_sec_idx", + "KeySchema": [{"AttributeName": "value", "KeyType": "HASH"}], + "Projection": {"ProjectionType": "KEYS_ONLY"}, + } + + self.client.create_table( + AttributeDefinitions=[ + {"AttributeName": "id", "AttributeType": "S"}, + {"AttributeName": "value", "AttributeType": "S"}, + ], + KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}], + LocalSecondaryIndexes=[local_sec_idx], + GlobalSecondaryIndexes=[global_sec_idx], + ProvisionedThroughput={ + "ReadCapacityUnits": 42, + "WriteCapacityUnits": 17, + }, + TableName=self.default_table_name, + ) + + span = self.assert_span("CreateTable") + self.assert_table_names(span, self.default_table_name) + self.assertEqual( + (json.dumps(global_sec_idx),), + span.attributes[ + SpanAttributes.AWS_DYNAMODB_GLOBAL_SECONDARY_INDEXES + ], + ) + self.assertEqual( + (json.dumps(local_sec_idx),), + span.attributes[ + SpanAttributes.AWS_DYNAMODB_LOCAL_SECONDARY_INDEXES + ], + ) + self.assert_provisioned_read_cap(span, 42) + + @mock_dynamodb2 + def test_delete_item(self): + self._create_prepared_table() + + self.client.delete_item( + TableName=self.default_table_name, + Key={"id": {"S": "1"}}, + ReturnConsumedCapacity="TOTAL", + ReturnItemCollectionMetrics="SIZE", + ) + + span = self.assert_span("DeleteItem") + self.assert_table_names(span, self.default_table_name) + # moto does not seem to return these: + # self.assert_consumed_capacity(span, self.default_table_name) + # self.assert_item_coll_metrics(span) + + def test_delete_item_consumed_capacity(self): + span = self.tracer_provider.get_tracer("test").start_span("test") + extension = self._create_extension("DeleteItem") + + extension.on_success( + span, {"ConsumedCapacity": {"TableName": "table"}} + ) + self.assert_consumed_capacity(span, "table") + + def test_delete_item_item_collection_metrics(self): + self.assert_extension_item_col_metrics("DeleteItem") + + @mock_dynamodb2 + def test_delete_table(self): + self._create_prepared_table() + + self.client.delete_table(TableName=self.default_table_name) + + span = self.assert_span("DeleteTable") + self.assert_table_names(span, self.default_table_name) + + @mock_dynamodb2 + def test_describe_table(self): + self._create_prepared_table() + + self.client.describe_table(TableName=self.default_table_name) + + span = self.assert_span("DescribeTable") + self.assert_table_names(span, self.default_table_name) + + @mock_dynamodb2 + def test_get_item(self): + self._create_prepared_table() + + self.client.get_item( + TableName=self.default_table_name, + Key={"id": {"S": "1"}}, + ConsistentRead=True, + AttributesToGet=["id"], + ProjectionExpression="1,2", + ReturnConsumedCapacity="TOTAL", + ) + + span = self.assert_span("GetItem") + self.assert_table_names(span, self.default_table_name) + self.assert_consistent_read(span, True) + self.assert_projection(span, "1,2") + self.assert_consumed_capacity(span, self.default_table_name) + + @mock_dynamodb2 + def test_list_tables(self): + self._create_table(TableName="my_table") + self._create_prepared_table() + + self.client.list_tables(ExclusiveStartTableName="my_table", Limit=5) + + span = self.assert_span("ListTables") + self.assertEqual( + "my_table", + span.attributes[SpanAttributes.AWS_DYNAMODB_EXCLUSIVE_START_TABLE], + ) + self.assertEqual( + 1, span.attributes[SpanAttributes.AWS_DYNAMODB_TABLE_COUNT] + ) + self.assertEqual(5, span.attributes[SpanAttributes.AWS_DYNAMODB_LIMIT]) + + @mock_dynamodb2 + def test_put_item(self): + table = "test_table" + self._create_prepared_table(TableName=table) + + self.client.put_item( + TableName=table, + Item={"id": {"S": "1"}, "idl": {"S": "2"}, "idg": {"S": "3"}}, + ReturnConsumedCapacity="TOTAL", + ReturnItemCollectionMetrics="SIZE", + ) + + span = self.assert_span("PutItem") + self.assert_table_names(span, table) + self.assert_consumed_capacity(span, table) + # moto does not seem to return these: + # self.assert_item_coll_metrics(span) + + def test_put_item_item_collection_metrics(self): + self.assert_extension_item_col_metrics("PutItem") + + @mock_dynamodb2 + def test_query(self): + self._create_prepared_table() + + self.client.query( + TableName=self.default_table_name, + IndexName="lsi", + Select="ALL_ATTRIBUTES", + AttributesToGet=["id"], + Limit=42, + ConsistentRead=True, + KeyConditions={ + "id": { + "AttributeValueList": [{"S": "123"}], + "ComparisonOperator": "EQ", + } + }, + ScanIndexForward=True, + ProjectionExpression="1,2", + ReturnConsumedCapacity="TOTAL", + ) + + span = self.assert_span("Query") + self.assert_table_names(span, self.default_table_name) + self.assertTrue( + span.attributes[SpanAttributes.AWS_DYNAMODB_SCAN_FORWARD] + ) + self.assert_attributes_to_get(span, "id") + self.assert_consistent_read(span, True) + self.assert_index_name(span, "lsi") + self.assert_limit(span, 42) + self.assert_projection(span, "1,2") + self.assert_select(span, "ALL_ATTRIBUTES") + self.assert_consumed_capacity(span, self.default_table_name) + + @mock_dynamodb2 + def test_scan(self): + self._create_prepared_table() + + self.client.scan( + TableName=self.default_table_name, + IndexName="lsi", + AttributesToGet=["id", "idl"], + Limit=42, + Select="ALL_ATTRIBUTES", + TotalSegments=17, + Segment=21, + ProjectionExpression="1,2", + ConsistentRead=True, + ReturnConsumedCapacity="TOTAL", + ) + + span = self.assert_span("Scan") + self.assert_table_names(span, self.default_table_name) + self.assertEqual( + 21, span.attributes[SpanAttributes.AWS_DYNAMODB_SEGMENT] + ) + self.assertEqual( + 17, span.attributes[SpanAttributes.AWS_DYNAMODB_TOTAL_SEGMENTS] + ) + self.assertEqual(1, span.attributes[SpanAttributes.AWS_DYNAMODB_COUNT]) + self.assertEqual( + 1, span.attributes[SpanAttributes.AWS_DYNAMODB_SCANNED_COUNT] + ) + self.assert_attributes_to_get(span, "id", "idl") + self.assert_consistent_read(span, True) + self.assert_index_name(span, "lsi") + self.assert_limit(span, 42) + self.assert_projection(span, "1,2") + self.assert_select(span, "ALL_ATTRIBUTES") + self.assert_consumed_capacity(span, self.default_table_name) + + @mock_dynamodb2 + def test_update_item(self): + self._create_prepared_table() + + self.client.update_item( + TableName=self.default_table_name, + Key={"id": {"S": "123"}}, + AttributeUpdates={"id": {"Value": {"S": "456"}, "Action": "PUT"}}, + ReturnConsumedCapacity="TOTAL", + ReturnItemCollectionMetrics="SIZE", + ) + + span = self.assert_span("UpdateItem") + self.assert_table_names(span, self.default_table_name) + self.assert_consumed_capacity(span, self.default_table_name) + # moto does not seem to return these: + # self.assert_item_coll_metrics(span) + + def test_update_item_item_collection_metrics(self): + self.assert_extension_item_col_metrics("UpdateItem") + + @mock_dynamodb2 + def test_update_table(self): + self._create_prepared_table() + + global_sec_idx_updates = { + "Update": { + "IndexName": "gsi", + "ProvisionedThroughput": { + "ReadCapacityUnits": 777, + "WriteCapacityUnits": 666, + }, + } + } + attr_definition = {"AttributeName": "id", "AttributeType": "N"} + + self.client.update_table( + TableName=self.default_table_name, + AttributeDefinitions=[attr_definition], + ProvisionedThroughput={ + "ReadCapacityUnits": 23, + "WriteCapacityUnits": 19, + }, + GlobalSecondaryIndexUpdates=[global_sec_idx_updates], + ) + + span = self.assert_span("UpdateTable") + self.assert_table_names(span, self.default_table_name) + self.assert_provisioned_read_cap(span, 23) + self.assert_provisioned_write_cap(span, 19) + self.assertEqual( + (json.dumps(attr_definition),), + span.attributes[SpanAttributes.AWS_DYNAMODB_ATTRIBUTE_DEFINITIONS], + ) + self.assertEqual( + (json.dumps(global_sec_idx_updates),), + span.attributes[ + SpanAttributes.AWS_DYNAMODB_GLOBAL_SECONDARY_INDEX_UPDATES + ], + ) diff --git a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/tests/test_botocore_instrumentation.py b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/tests/test_botocore_instrumentation.py new file mode 100644 index 0000000000..3d25dcbf2d --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/tests/test_botocore_instrumentation.py @@ -0,0 +1,425 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import json +from unittest.mock import Mock, patch + +import botocore.session +from botocore.exceptions import ParamValidationError +from moto import ( # pylint: disable=import-error + mock_ec2, + mock_kinesis, + mock_kms, + mock_s3, + mock_sqs, + mock_sts, + mock_xray, +) + +from opentelemetry import trace as trace_api +from opentelemetry.context import ( + _SUPPRESS_HTTP_INSTRUMENTATION_KEY, + attach, + detach, + set_value, +) +from opentelemetry.instrumentation.botocore import BotocoreInstrumentor +from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY +from opentelemetry.propagate import get_global_textmap, set_global_textmap +from opentelemetry.propagators.aws.aws_xray_propagator import TRACE_HEADER_KEY +from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.test.mock_textmap import MockTextMapPropagator +from opentelemetry.test.test_base import TestBase +from opentelemetry.trace.span import format_span_id, format_trace_id + +_REQUEST_ID_REGEX_MATCH = r"[A-Z0-9]{52}" + + +# pylint:disable=too-many-public-methods +class TestBotocoreInstrumentor(TestBase): + """Botocore integration testsuite""" + + def setUp(self): + super().setUp() + BotocoreInstrumentor().instrument() + + self.session = botocore.session.get_session() + self.session.set_credentials( + access_key="access-key", secret_key="secret-key" + ) + self.region = "us-west-2" + + def tearDown(self): + super().tearDown() + BotocoreInstrumentor().uninstrument() + + def _make_client(self, service: str): + return self.session.create_client(service, region_name=self.region) + + def _default_span_attributes(self, service: str, operation: str): + return { + SpanAttributes.RPC_SYSTEM: "aws-api", + SpanAttributes.RPC_SERVICE: service, + SpanAttributes.RPC_METHOD: operation, + "aws.region": self.region, + "retry_attempts": 0, + SpanAttributes.HTTP_STATUS_CODE: 200, + } + + def assert_only_span(self): + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(1, len(spans)) + return spans[0] + + def assert_span( + self, + service: str, + operation: str, + request_id=None, + attributes=None, + ): + span = self.assert_only_span() + expected = self._default_span_attributes(service, operation) + if attributes: + expected.update(attributes) + + span_attributes_request_id = "aws.request_id" + if request_id is _REQUEST_ID_REGEX_MATCH: + actual_request_id = span.attributes[span_attributes_request_id] + self.assertRegex(actual_request_id, _REQUEST_ID_REGEX_MATCH) + expected[span_attributes_request_id] = actual_request_id + elif request_id is not None: + expected[span_attributes_request_id] = request_id + + self.assertSpanHasAttributes(span, expected) + self.assertEqual(f"{service}.{operation}", span.name) + return span + + @mock_ec2 + def test_traced_client(self): + ec2 = self._make_client("ec2") + + ec2.describe_instances() + + request_id = "fdcdcab1-ae5c-489e-9c33-4637c5dda355" + self.assert_span("EC2", "DescribeInstances", request_id=request_id) + + @mock_ec2 + def test_not_recording(self): + mock_tracer = Mock() + mock_span = Mock() + mock_span.is_recording.return_value = False + mock_tracer.start_span.return_value = mock_span + with patch("opentelemetry.trace.get_tracer") as tracer: + tracer.return_value = mock_tracer + ec2 = self._make_client("ec2") + ec2.describe_instances() + self.assertFalse(mock_span.is_recording()) + self.assertTrue(mock_span.is_recording.called) + self.assertFalse(mock_span.set_attribute.called) + self.assertFalse(mock_span.set_status.called) + + @mock_s3 + def test_exception(self): + s3 = self._make_client("s3") + + with self.assertRaises(ParamValidationError): + s3.list_objects(bucket="mybucket") + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(1, len(spans)) + span = spans[0] + + expected = self._default_span_attributes("S3", "ListObjects") + expected.pop(SpanAttributes.HTTP_STATUS_CODE) + expected.pop("retry_attempts") + self.assertEqual(expected, span.attributes) + self.assertIs(span.status.status_code, trace_api.StatusCode.ERROR) + + self.assertEqual(1, len(span.events)) + event = span.events[0] + self.assertIn(SpanAttributes.EXCEPTION_STACKTRACE, event.attributes) + self.assertIn(SpanAttributes.EXCEPTION_TYPE, event.attributes) + self.assertIn(SpanAttributes.EXCEPTION_MESSAGE, event.attributes) + + @mock_s3 + def test_s3_client(self): + s3 = self._make_client("s3") + + s3.list_buckets() + self.assert_span("S3", "ListBuckets") + + @mock_s3 + def test_s3_put(self): + s3 = self._make_client("s3") + + location = {"LocationConstraint": "us-west-2"} + s3.create_bucket(Bucket="mybucket", CreateBucketConfiguration=location) + self.assert_span( + "S3", "CreateBucket", request_id=_REQUEST_ID_REGEX_MATCH + ) + self.memory_exporter.clear() + + s3.put_object(Key="foo", Bucket="mybucket", Body=b"bar") + self.assert_span("S3", "PutObject", request_id=_REQUEST_ID_REGEX_MATCH) + self.memory_exporter.clear() + + s3.get_object(Bucket="mybucket", Key="foo") + self.assert_span("S3", "GetObject", request_id=_REQUEST_ID_REGEX_MATCH) + + @mock_sqs + def test_sqs_client(self): + sqs = self._make_client("sqs") + + sqs.list_queues() + + self.assert_span( + "SQS", "ListQueues", request_id=_REQUEST_ID_REGEX_MATCH + ) + + @mock_sqs + def test_sqs_send_message(self): + sqs = self._make_client("sqs") + test_queue_name = "test_queue_name" + + response = sqs.create_queue(QueueName=test_queue_name) + self.assert_span( + "SQS", "CreateQueue", request_id=_REQUEST_ID_REGEX_MATCH + ) + self.memory_exporter.clear() + + queue_url = response["QueueUrl"] + sqs.send_message(QueueUrl=queue_url, MessageBody="Test SQS MESSAGE!") + + self.assert_span( + "SQS", + "SendMessage", + request_id=_REQUEST_ID_REGEX_MATCH, + attributes={"aws.queue_url": queue_url}, + ) + + @mock_kinesis + def test_kinesis_client(self): + kinesis = self._make_client("kinesis") + + kinesis.list_streams() + self.assert_span("Kinesis", "ListStreams") + + @mock_kinesis + def test_unpatch(self): + kinesis = self._make_client("kinesis") + + BotocoreInstrumentor().uninstrument() + + kinesis.list_streams() + self.assertEqual(0, len(self.memory_exporter.get_finished_spans())) + + @mock_ec2 + def test_uninstrument_does_not_inject_headers(self): + headers = {} + + def intercept_headers(**kwargs): + headers.update(kwargs["request"].headers) + + ec2 = self._make_client("ec2") + + BotocoreInstrumentor().uninstrument() + + ec2.meta.events.register_first( + "before-send.ec2.DescribeInstances", intercept_headers + ) + with self.tracer_provider.get_tracer("test").start_span("parent"): + ec2.describe_instances() + + self.assertNotIn(TRACE_HEADER_KEY, headers) + + @mock_sqs + def test_double_patch(self): + sqs = self._make_client("sqs") + + BotocoreInstrumentor().instrument() + BotocoreInstrumentor().instrument() + + sqs.list_queues() + self.assert_span( + "SQS", "ListQueues", request_id=_REQUEST_ID_REGEX_MATCH + ) + + @mock_kms + def test_kms_client(self): + kms = self._make_client("kms") + + kms.list_keys(Limit=21) + + span = self.assert_only_span() + # check for exact attribute set to make sure not to leak any kms secrets + self.assertEqual( + self._default_span_attributes("KMS", "ListKeys"), span.attributes + ) + + @mock_sts + def test_sts_client(self): + sts = self._make_client("sts") + + sts.get_caller_identity() + + span = self.assert_only_span() + expected = self._default_span_attributes("STS", "GetCallerIdentity") + expected["aws.request_id"] = "c6104cbe-af31-11e0-8154-cbc7ccf896c7" + # check for exact attribute set to make sure not to leak any sts secrets + self.assertEqual(expected, span.attributes) + + @mock_ec2 + def test_propagator_injects_into_request(self): + headers = {} + previous_propagator = get_global_textmap() + + def check_headers(**kwargs): + nonlocal headers + headers = kwargs["request"].headers + + try: + set_global_textmap(MockTextMapPropagator()) + + ec2 = self._make_client("ec2") + ec2.meta.events.register_first( + "before-send.ec2.DescribeInstances", check_headers + ) + ec2.describe_instances() + + request_id = "fdcdcab1-ae5c-489e-9c33-4637c5dda355" + span = self.assert_span( + "EC2", "DescribeInstances", request_id=request_id + ) + + # only x-ray propagation is used in HTTP requests + self.assertIn(TRACE_HEADER_KEY, headers) + xray_context = headers[TRACE_HEADER_KEY] + formated_trace_id = format_trace_id( + span.get_span_context().trace_id + ) + formated_trace_id = ( + formated_trace_id[:8] + "-" + formated_trace_id[8:] + ) + + self.assertEqual( + xray_context.lower(), + f"root=1-{formated_trace_id};parent={format_span_id(span.get_span_context().span_id)};sampled=1".lower(), + ) + finally: + set_global_textmap(previous_propagator) + + @mock_ec2 + def test_override_xray_propagator_injects_into_request(self): + headers = {} + + def check_headers(**kwargs): + nonlocal headers + headers = kwargs["request"].headers + + BotocoreInstrumentor().instrument() + + ec2 = self._make_client("ec2") + ec2.meta.events.register_first( + "before-send.ec2.DescribeInstances", check_headers + ) + ec2.describe_instances() + + self.assertNotIn(MockTextMapPropagator.TRACE_ID_KEY, headers) + self.assertNotIn(MockTextMapPropagator.SPAN_ID_KEY, headers) + + @mock_xray + def test_suppress_instrumentation_xray_client(self): + xray_client = self._make_client("xray") + token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)) + try: + xray_client.put_trace_segments(TraceSegmentDocuments=["str1"]) + xray_client.put_trace_segments(TraceSegmentDocuments=["str2"]) + finally: + detach(token) + self.assertEqual(0, len(self.get_finished_spans())) + + @mock_xray + def test_suppress_http_instrumentation_xray_client(self): + xray_client = self._make_client("xray") + token = attach(set_value(_SUPPRESS_HTTP_INSTRUMENTATION_KEY, True)) + try: + xray_client.put_trace_segments(TraceSegmentDocuments=["str1"]) + xray_client.put_trace_segments(TraceSegmentDocuments=["str2"]) + finally: + detach(token) + self.assertEqual(2, len(self.get_finished_spans())) + + @mock_s3 + def test_request_hook(self): + request_hook_service_attribute_name = "request_hook.service_name" + request_hook_operation_attribute_name = "request_hook.operation_name" + request_hook_api_params_attribute_name = "request_hook.api_params" + + def request_hook(span, service_name, operation_name, api_params): + hook_attributes = { + request_hook_service_attribute_name: service_name, + request_hook_operation_attribute_name: operation_name, + request_hook_api_params_attribute_name: json.dumps(api_params), + } + + span.set_attributes(hook_attributes) + + BotocoreInstrumentor().uninstrument() + BotocoreInstrumentor().instrument(request_hook=request_hook) + + s3 = self._make_client("s3") + + params = { + "Bucket": "mybucket", + "CreateBucketConfiguration": {"LocationConstraint": "us-west-2"}, + } + s3.create_bucket(**params) + self.assert_span( + "S3", + "CreateBucket", + attributes={ + request_hook_service_attribute_name: "s3", + request_hook_operation_attribute_name: "CreateBucket", + request_hook_api_params_attribute_name: json.dumps(params), + }, + ) + + @mock_s3 + def test_response_hook(self): + response_hook_service_attribute_name = "request_hook.service_name" + response_hook_operation_attribute_name = "response_hook.operation_name" + response_hook_result_attribute_name = "response_hook.result" + + def response_hook(span, service_name, operation_name, result): + hook_attributes = { + response_hook_service_attribute_name: service_name, + response_hook_operation_attribute_name: operation_name, + response_hook_result_attribute_name: len(result["Buckets"]), + } + span.set_attributes(hook_attributes) + + BotocoreInstrumentor().uninstrument() + BotocoreInstrumentor().instrument(response_hook=response_hook) + + s3 = self._make_client("s3") + s3.list_buckets() + self.assert_span( + "S3", + "ListBuckets", + attributes={ + response_hook_service_attribute_name: "s3", + response_hook_operation_attribute_name: "ListBuckets", + response_hook_result_attribute_name: 0, + }, + ) diff --git a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/tests/test_botocore_lambda.py b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/tests/test_botocore_lambda.py new file mode 100644 index 0000000000..7388323100 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/tests/test_botocore_lambda.py @@ -0,0 +1,187 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import io +import json +import sys +import zipfile +from unittest import mock + +import botocore.session +from moto import mock_iam, mock_lambda # pylint: disable=import-error +from pytest import mark + +from opentelemetry.instrumentation.botocore import BotocoreInstrumentor +from opentelemetry.instrumentation.botocore.extensions.lmbd import ( + _LambdaExtension, +) +from opentelemetry.propagate import get_global_textmap, set_global_textmap +from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.test.mock_textmap import MockTextMapPropagator +from opentelemetry.test.test_base import TestBase +from opentelemetry.trace.span import Span + + +def get_as_zip_file(file_name, content): + zip_output = io.BytesIO() + with zipfile.ZipFile(zip_output, "w", zipfile.ZIP_DEFLATED) as zip_file: + zip_file.writestr(file_name, content) + zip_output.seek(0) + return zip_output.read() + + +def return_headers_lambda_str(): + pfunc = """ +def lambda_handler(event, context): + print("custom log event") + headers = event.get('headers', event.get('attributes', {})) + return headers +""" + return pfunc + + +class TestLambdaExtension(TestBase): + def setUp(self): + super().setUp() + BotocoreInstrumentor().instrument() + + session = botocore.session.get_session() + session.set_credentials( + access_key="access-key", secret_key="secret-key" + ) + self.region = "us-west-2" + self.client = session.create_client("lambda", region_name=self.region) + self.iam_client = session.create_client("iam", region_name=self.region) + + def tearDown(self): + super().tearDown() + BotocoreInstrumentor().uninstrument() + + def assert_span(self, operation: str) -> Span: + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(1, len(spans)) + + span = spans[0] + self.assertEqual(operation, span.attributes[SpanAttributes.RPC_METHOD]) + self.assertEqual("Lambda", span.attributes[SpanAttributes.RPC_SERVICE]) + self.assertEqual("aws-api", span.attributes[SpanAttributes.RPC_SYSTEM]) + return span + + def assert_invoke_span(self, function_name: str) -> Span: + span = self.assert_span("Invoke") + self.assertEqual( + "aws", span.attributes[SpanAttributes.FAAS_INVOKED_PROVIDER] + ) + self.assertEqual( + self.region, span.attributes[SpanAttributes.FAAS_INVOKED_REGION] + ) + self.assertEqual( + function_name, span.attributes[SpanAttributes.FAAS_INVOKED_NAME] + ) + return span + + @staticmethod + def _create_extension(operation: str) -> _LambdaExtension: + mock_call_context = mock.MagicMock(operation=operation, params={}) + return _LambdaExtension(mock_call_context) + + @mock_lambda + def test_list_functions(self): + self.client.list_functions() + self.assert_span("ListFunctions") + + @mock_iam + def _create_role_and_get_arn(self) -> str: + return self.iam_client.create_role( + RoleName="my-role", + AssumeRolePolicyDocument="some policy", + Path="/my-path/", + )["Role"]["Arn"] + + def _create_lambda_function(self, function_name: str, function_code: str): + role_arn = self._create_role_and_get_arn() + + self.client.create_function( + FunctionName=function_name, + Runtime="python3.8", + Role=role_arn, + Handler="lambda_function.lambda_handler", + Code={ + "ZipFile": get_as_zip_file("lambda_function.py", function_code) + }, + Description="test lambda function", + Timeout=3, + MemorySize=128, + Publish=True, + ) + + @mark.skip(reason="Docker error, unblocking builds for now.") + @mark.skipif( + sys.platform == "win32", + reason="requires docker and Github CI Windows does not have docker installed by default", + ) + @mock_lambda + def test_invoke(self): + previous_propagator = get_global_textmap() + try: + set_global_textmap(MockTextMapPropagator()) + function_name = "testFunction" + self._create_lambda_function( + function_name, return_headers_lambda_str() + ) + # 2 spans for create IAM + create lambda + self.assertEqual(2, len(self.memory_exporter.get_finished_spans())) + self.memory_exporter.clear() + + response = self.client.invoke( + Payload=json.dumps({}), + FunctionName=function_name, + InvocationType="RequestResponse", + ) + + span = self.assert_invoke_span(function_name) + span_context = span.get_span_context() + + # # assert injected span + headers = json.loads(response["Payload"].read().decode("utf-8")) + self.assertEqual( + str(span_context.trace_id), + headers[MockTextMapPropagator.TRACE_ID_KEY], + ) + self.assertEqual( + str(span_context.span_id), + headers[MockTextMapPropagator.SPAN_ID_KEY], + ) + finally: + set_global_textmap(previous_propagator) + + def test_invoke_parse_arn(self): + function_name = "my_func" + arns = ( + f"arn:aws:lambda:{self.region}:000000000000:function:{function_name}", # full arn + f"000000000000:{function_name}", # partial arn + f"arn:aws:lambda:{self.region}:000000000000:function:{function_name}:alias", # aliased arn + ) + + for arn in arns: + with self.subTest(arn=arn): + extension = self._create_extension("Invoke") + extension._call_context.params["FunctionName"] = arn + + attributes = {} + extension.extract_attributes(attributes) + + self.assertEqual( + function_name, attributes[SpanAttributes.FAAS_INVOKED_NAME] + ) diff --git a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/tests/test_botocore_messaging.py b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/tests/test_botocore_messaging.py new file mode 100644 index 0000000000..d8a92e0cf9 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/tests/test_botocore_messaging.py @@ -0,0 +1,52 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from opentelemetry.instrumentation.botocore.extensions._messaging import ( + inject_propagation_context, + message_attributes_setter, +) +from opentelemetry.test.test_base import TestBase + + +class TestMessageAttributes(TestBase): + def test_message_attributes_setter(self): + carrier = {} + + message_attributes_setter.set(carrier, "key", "value") + self.assertEqual( + {"key": {"DataType": "String", "StringValue": "value"}}, carrier + ) + + def test_inject_propagation_context(self): + carrier = { + "key1": {"DataType": "String", "StringValue": "value1"}, + "key2": {"DataType": "String", "StringValue": "value2"}, + } + + tracer = self.tracer_provider.get_tracer("test-tracer") + with tracer.start_as_current_span("span"): + inject_propagation_context(carrier) + + self.assertGreater(len(carrier), 2) + + def test_inject_propagation_context_too_many_attributes(self): + carrier = { + f"key{idx}": {"DataType": "String", "StringValue": f"value{idx}"} + for idx in range(10) + } + tracer = self.tracer_provider.get_tracer("test-tracer") + with tracer.start_as_current_span("span"): + inject_propagation_context(carrier) + + self.assertEqual(10, len(carrier)) diff --git a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/tests/test_botocore_sns.py b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/tests/test_botocore_sns.py new file mode 100644 index 0000000000..33f2531027 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/tests/test_botocore_sns.py @@ -0,0 +1,189 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import contextlib +from typing import Any, Dict +from unittest import mock + +import botocore.session +from botocore.awsrequest import AWSResponse +from moto import mock_sns + +from opentelemetry.instrumentation.botocore import BotocoreInstrumentor +from opentelemetry.semconv.trace import ( + MessagingDestinationKindValues, + SpanAttributes, +) +from opentelemetry.test.test_base import TestBase +from opentelemetry.trace import SpanKind +from opentelemetry.trace.span import Span + + +class TestSnsExtension(TestBase): + def setUp(self): + super().setUp() + BotocoreInstrumentor().instrument() + + session = botocore.session.get_session() + session.set_credentials( + access_key="access-key", secret_key="secret-key" + ) + self.client = session.create_client("sns", region_name="us-west-2") + self.topic_name = "my-topic" + + def tearDown(self): + super().tearDown() + BotocoreInstrumentor().uninstrument() + + def _create_topic(self, name: str = None) -> str: + if name is None: + name = self.topic_name + + response = self.client.create_topic(Name=name) + + self.memory_exporter.clear() + return response["TopicArn"] + + @contextlib.contextmanager + def _mocked_aws_endpoint(self, response): + response_func = self._make_aws_response_func(response) + with mock.patch( + "botocore.endpoint.Endpoint.make_request", new=response_func + ): + yield + + @staticmethod + def _make_aws_response_func(response): + def _response_func(*args, **kwargs): + return AWSResponse("http://127.0.0.1", 200, {}, "{}"), response + + return _response_func + + def assert_span(self, name: str) -> Span: + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(1, len(spans)) + span = spans[0] + + self.assertEqual(SpanKind.PRODUCER, span.kind) + self.assertEqual(name, span.name) + self.assertEqual( + "aws.sns", span.attributes[SpanAttributes.MESSAGING_SYSTEM] + ) + + return span + + def assert_injected_span(self, message_attrs: Dict[str, Any], span: Span): + # traceparent: --- + trace_parent = message_attrs["traceparent"]["StringValue"].split("-") + span_context = span.get_span_context() + + self.assertEqual(span_context.trace_id, int(trace_parent[1], 16)) + self.assertEqual(span_context.span_id, int(trace_parent[2], 16)) + + @mock_sns + def test_publish_to_topic_arn(self): + self._test_publish_to_arn("TopicArn") + + @mock_sns + def test_publish_to_target_arn(self): + self._test_publish_to_arn("TargetArn") + + def _test_publish_to_arn(self, arg_name: str): + target_arn = self._create_topic(self.topic_name) + + self.client.publish( + **{ + arg_name: target_arn, + "Message": "Hello message", + } + ) + + span = self.assert_span(f"{self.topic_name} send") + self.assertEqual( + MessagingDestinationKindValues.TOPIC.value, + span.attributes[SpanAttributes.MESSAGING_DESTINATION_KIND], + ) + self.assertEqual( + self.topic_name, + span.attributes[SpanAttributes.MESSAGING_DESTINATION], + ) + + @mock_sns + def test_publish_to_phone_number(self): + phone_number = "+10000000000" + self.client.publish( + PhoneNumber=phone_number, + Message="Hello SNS", + ) + + span = self.assert_span("phone_number send") + self.assertEqual( + phone_number, span.attributes[SpanAttributes.MESSAGING_DESTINATION] + ) + + @mock_sns + def test_publish_injects_span(self): + message_attrs = {} + topic_arn = self._create_topic() + self.client.publish( + TopicArn=topic_arn, + Message="Hello Message", + MessageAttributes=message_attrs, + ) + + span = self.assert_span(f"{self.topic_name} send") + self.assert_injected_span(message_attrs, span) + + def test_publish_batch_to_topic(self): + topic_arn = f"arn:aws:sns:region:000000000:{self.topic_name}" + message1_attrs = {} + message2_attrs = {} + mock_response = { + "Successful": [ + {"Id": "1", "MessageId": "11", "SequenceNumber": "1"}, + {"Id": "2", "MessageId": "22", "SequenceNumber": "2"}, + ], + "Failed": [], + } + + # publish_batch not implemented by moto so mock the endpoint instead + with self._mocked_aws_endpoint(mock_response): + self.client.publish_batch( + TopicArn=topic_arn, + PublishBatchRequestEntries=[ + { + "Id": "1", + "Message": "Hello message 1", + "MessageAttributes": message1_attrs, + }, + { + "Id": "2", + "Message": "Hello message 2", + "MessageAttributes": message2_attrs, + }, + ], + ) + + span = self.assert_span(f"{self.topic_name} send") + self.assertEqual( + MessagingDestinationKindValues.TOPIC.value, + span.attributes[SpanAttributes.MESSAGING_DESTINATION_KIND], + ) + self.assertEqual( + self.topic_name, + span.attributes[SpanAttributes.MESSAGING_DESTINATION], + ) + + self.assert_injected_span(message1_attrs, span) + self.assert_injected_span(message2_attrs, span) diff --git a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/tests/test_botocore_sqs.py b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/tests/test_botocore_sqs.py new file mode 100644 index 0000000000..6bcffd9274 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/tests/test_botocore_sqs.py @@ -0,0 +1,136 @@ +import botocore.session +from moto import mock_sqs + +from opentelemetry.instrumentation.botocore import BotocoreInstrumentor +from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.test.test_base import TestBase + + +class TestSqsExtension(TestBase): + def setUp(self): + super().setUp() + BotocoreInstrumentor().instrument() + + session = botocore.session.get_session() + session.set_credentials( + access_key="access-key", secret_key="secret-key" + ) + self.region = "us-west-2" + self.client = session.create_client("sqs", region_name=self.region) + + def tearDown(self): + super().tearDown() + BotocoreInstrumentor().uninstrument() + + @mock_sqs + def test_sqs_messaging_send_message(self): + create_queue_result = self.client.create_queue( + QueueName="test_queue_name" + ) + queue_url = create_queue_result["QueueUrl"] + response = self.client.send_message( + QueueUrl=queue_url, MessageBody="content" + ) + + spans = self.memory_exporter.get_finished_spans() + assert spans + self.assertEqual(len(spans), 2) + span = spans[1] + self.assertEqual( + span.attributes[SpanAttributes.MESSAGING_SYSTEM], "aws.sqs" + ) + self.assertEqual( + span.attributes[SpanAttributes.MESSAGING_URL], queue_url + ) + self.assertEqual( + span.attributes[SpanAttributes.MESSAGING_DESTINATION], + "test_queue_name", + ) + self.assertEqual( + span.attributes[SpanAttributes.MESSAGING_MESSAGE_ID], + response["MessageId"], + ) + + @mock_sqs + def test_sqs_messaging_send_message_batch(self): + create_queue_result = self.client.create_queue( + QueueName="test_queue_name" + ) + queue_url = create_queue_result["QueueUrl"] + response = self.client.send_message_batch( + QueueUrl=queue_url, + Entries=[ + {"Id": "1", "MessageBody": "content"}, + {"Id": "2", "MessageBody": "content2"}, + ], + ) + + spans = self.memory_exporter.get_finished_spans() + assert spans + self.assertEqual(len(spans), 2) + span = spans[1] + self.assertEqual(span.attributes["rpc.method"], "SendMessageBatch") + self.assertEqual( + span.attributes[SpanAttributes.MESSAGING_SYSTEM], "aws.sqs" + ) + self.assertEqual( + span.attributes[SpanAttributes.MESSAGING_URL], queue_url + ) + self.assertEqual( + span.attributes[SpanAttributes.MESSAGING_DESTINATION], + "test_queue_name", + ) + self.assertEqual( + span.attributes[SpanAttributes.MESSAGING_MESSAGE_ID], + response["Successful"][0]["MessageId"], + ) + + @mock_sqs + def test_sqs_messaging_receive_message(self): + create_queue_result = self.client.create_queue( + QueueName="test_queue_name" + ) + queue_url = create_queue_result["QueueUrl"] + self.client.send_message(QueueUrl=queue_url, MessageBody="content") + message_result = self.client.receive_message( + QueueUrl=create_queue_result["QueueUrl"] + ) + + spans = self.memory_exporter.get_finished_spans() + assert spans + self.assertEqual(len(spans), 3) + span = spans[-1] + self.assertEqual(span.attributes["rpc.method"], "ReceiveMessage") + self.assertEqual( + span.attributes[SpanAttributes.MESSAGING_SYSTEM], "aws.sqs" + ) + self.assertEqual( + span.attributes[SpanAttributes.MESSAGING_URL], queue_url + ) + self.assertEqual( + span.attributes[SpanAttributes.MESSAGING_DESTINATION], + "test_queue_name", + ) + self.assertEqual( + span.attributes[SpanAttributes.MESSAGING_MESSAGE_ID], + message_result["Messages"][0]["MessageId"], + ) + + @mock_sqs + def test_sqs_messaging_failed_operation(self): + with self.assertRaises(Exception): + self.client.send_message( + QueueUrl="non-existing", MessageBody="content" + ) + + spans = self.memory_exporter.get_finished_spans() + assert spans + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertEqual(span.attributes["rpc.method"], "SendMessage") + self.assertEqual( + span.attributes[SpanAttributes.MESSAGING_SYSTEM], "aws.sqs" + ) + self.assertEqual( + span.attributes[SpanAttributes.MESSAGING_URL], "non-existing" + ) diff --git a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/utils.py b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/utils.py index 6f4f9cbc3a..f92c5e03c8 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/utils.py +++ b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/utils.py @@ -15,6 +15,7 @@ import logging from celery import registry # pylint: disable=no-name-in-module +from billiard import VERSION from opentelemetry.semconv.trace import SpanAttributes diff --git a/instrumentation/opentelemetry-instrumentation-redis/tests/test_redis.py b/instrumentation/opentelemetry-instrumentation-redis/tests/test_redis.py index 2d2670fee3..234d756ef3 100644 --- a/instrumentation/opentelemetry-instrumentation-redis/tests/test_redis.py +++ b/instrumentation/opentelemetry-instrumentation-redis/tests/test_redis.py @@ -24,6 +24,24 @@ from opentelemetry.trace import SpanKind +class AsyncMock: + """A sufficient async mock implementation. + + Python 3.7 doesn't have an inbuilt async mock class, so this is used. + """ + + def __init__(self): + self.mock = mock.Mock() + + async def __call__(self, *args, **kwargs): + future = asyncio.Future() + future.set_result("random") + return future + + def __getattr__(self, item): + return AsyncMock() + + class TestRedis(TestBase): def setUp(self): super().setUp() diff --git a/opentelemetry-instrumentation/README.rst b/opentelemetry-instrumentation/README.rst index 6f66edb623..56f5f289e3 100644 --- a/opentelemetry-instrumentation/README.rst +++ b/opentelemetry-instrumentation/README.rst @@ -27,6 +27,9 @@ This package provides commands that help automatically instrument a program: When creating a custom distro and/or configurator, be sure to add entry points for each under `opentelemetry_distro` and `opentelemetry_configurator` respectfully. If you have entry points for multiple distros or configurators present in your environment, you should specify the entry point name of the distro and configurator you want to be used via the `OTEL_PYTHON_DISTRO` and `OTEL_PYTHON_CONFIGURATOR` environment variables. + When creating a custom distro and/or configurator, be sure to add entry points for each under `opentelemetry_distro` and `opentelemetry_configurator` respectfully. + If you have entry points for multiple distros or configurators present in your environment, you should specify the entry point name of the distro and configurator you want to be used via the `OTEL_PYTHON_DISTRO` and `OTEL_PYTHON_CONFIGURATOR` environment variables. + opentelemetry-bootstrap -----------------------