diff --git a/.github/workflows/generate_workflows.py b/.github/workflows/generate_workflows.py index fa60ada6225..2b456d29423 100644 --- a/.github/workflows/generate_workflows.py +++ b/.github/workflows/generate_workflows.py @@ -1,10 +1,10 @@ from pathlib import Path from generate_workflows_lib import ( - generate_test_workflow, - generate_lint_workflow, generate_contrib_workflow, - generate_misc_workflow + generate_lint_workflow, + generate_misc_workflow, + generate_test_workflow, ) tox_ini_path = Path(__file__).parent.parent.parent.joinpath("tox.ini") diff --git a/CHANGELOG.md b/CHANGELOG.md index 9848078bbde..c30312b01a5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#4094](https://github.com/open-telemetry/opentelemetry-python/pull/4094)) - Implement events sdk ([#4176](https://github.com/open-telemetry/opentelemetry-python/pull/4176)) +- Basic protection against the unintended cardinality explosion + ([#3486](https://github.com/open-telemetry/opentelemetry-python/pull/3486)) ## Version 1.27.0/0.48b0 (2024-08-28) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/_view_instrument_match.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/_view_instrument_match.py index 5431d1fa02d..24502cc2a2c 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/_view_instrument_match.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/_view_instrument_match.py @@ -31,6 +31,8 @@ from opentelemetry.sdk.metrics._internal.view import View _logger = getLogger(__name__) +_OVERFLOW_ATTRIBUTE = ("otel.metric.overflow", "true") +_DEFAULT_AGGREGATION_LIMIT = 2000 class _ViewInstrumentMatch: @@ -45,6 +47,9 @@ def __init__( self._attributes_aggregation: Dict[frozenset, _Aggregation] = {} self._lock = Lock() self._instrument_class_aggregation = instrument_class_aggregation + self._aggregation_cardinality_limit = ( + view._aggregation_cardinality_limit or _DEFAULT_AGGREGATION_LIMIT + ) self._name = self._view._name or self._instrument.name self._description = ( self._view._description or self._instrument.description @@ -87,6 +92,20 @@ def conflicts(self, other: "_ViewInstrumentMatch") -> bool: return result + def get_aggregation_key(self, attributes): + if ( + len(self._attributes_aggregation) + >= self._aggregation_cardinality_limit - 1 + ): + _logger.warning( + "Metric cardinality limit of %s exceeded. Aggregating under overflow attribute.", + self._aggregation_cardinality_limit, + ) + aggr_key = frozenset([_OVERFLOW_ATTRIBUTE]) + else: + aggr_key = frozenset(attributes.items()) + return aggr_key + # pylint: disable=protected-access def consume_measurement( self, measurement: Measurement, should_sample_exemplar: bool = True @@ -104,7 +123,7 @@ def consume_measurement( else: attributes = {} - aggr_key = frozenset(attributes.items()) + aggr_key = self.get_aggregation_key(attributes) if aggr_key not in self._attributes_aggregation: with self._lock: diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py index 1c94733a3d3..85bef8b4ea0 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py @@ -66,6 +66,7 @@ def __init__( sdk_config: SdkConfiguration, instrument_class_temporality: Dict[type, AggregationTemporality], instrument_class_aggregation: Dict[type, Aggregation], + aggregation_cardinality_limit: Optional[int] = None, ) -> None: self._lock = RLock() self._sdk_config = sdk_config @@ -74,6 +75,7 @@ def __init__( ] = {} self._instrument_class_temporality = instrument_class_temporality self._instrument_class_aggregation = instrument_class_aggregation + self._aggregation_cardinality_limit = aggregation_cardinality_limit def _get_or_init_view_instrument_match( self, instrument: Instrument diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/view.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/view.py index 5dd11be1f94..c566960a33e 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/view.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/view.py @@ -118,6 +118,7 @@ def __init__( Callable[[Type[_Aggregation]], ExemplarReservoirBuilder] ] = None, instrument_unit: Optional[str] = None, + aggregation_cardinality_limit: Optional[int] = None, ): if ( instrument_type @@ -158,6 +159,7 @@ def __init__( self._description = description self._attribute_keys = attribute_keys self._aggregation = aggregation or self._default_aggregation + self._aggregation_cardinality_limit = aggregation_cardinality_limit self._exemplar_reservoir_factory = ( exemplar_reservoir_factory or _default_reservoir_factory ) diff --git a/opentelemetry-sdk/tests/logs/test_log_record.py b/opentelemetry-sdk/tests/logs/test_log_record.py index 9c3746989b1..ee5932b1663 100644 --- a/opentelemetry-sdk/tests/logs/test_log_record.py +++ b/opentelemetry-sdk/tests/logs/test_log_record.py @@ -79,9 +79,18 @@ def test_log_record_dropped_attributes_set_limits_max_attribute(self): max_attributes=1, ) - result = LogRecord( - timestamp=0, body="a log line", attributes=attr, limits=limits - ) + with warnings.catch_warnings(record=True) as cw: + warnings.simplefilter("always") + result = LogRecord( + timestamp=0, body="a log line", attributes=attr, limits=limits + ) + + self.assertTrue( + any( + issubclass(warning.category, LogDroppedAttributesWarning) + for warning in cw + ) + ) self.assertTrue(result.dropped_attributes == 1) def test_log_record_dropped_attributes_set_limits_max_attribute_length( @@ -93,9 +102,18 @@ def test_log_record_dropped_attributes_set_limits_max_attribute_length( max_attribute_length=1, ) - result = LogRecord( - timestamp=0, body="a log line", attributes=attr, limits=limits - ) + with warnings.catch_warnings(record=True) as cw: + warnings.simplefilter("always") + result = LogRecord( + timestamp=0, body="a log line", attributes=attr, limits=limits + ) + # would not be dropping any attributes if the limit is not set + self.assertFalse( + any( + issubclass(warning.category, LogDroppedAttributesWarning) + for warning in cw + ) + ) self.assertTrue(result.dropped_attributes == 0) self.assertEqual(expected, result.attributes) @@ -107,9 +125,19 @@ def test_log_record_dropped_attributes_set_limits(self): max_attribute_length=1, ) - result = LogRecord( - timestamp=0, body="a log line", attributes=attr, limits=limits - ) + with warnings.catch_warnings(record=True) as cw: + warnings.simplefilter("always") + result = LogRecord( + timestamp=0, body="a log line", attributes=attr, limits=limits + ) + + self.assertTrue( + any( + issubclass(warning.category, LogDroppedAttributesWarning) + for warning in cw + ) + ) + self.assertTrue(result.dropped_attributes == 1) self.assertEqual(expected, result.attributes) diff --git a/opentelemetry-sdk/tests/metrics/test_metric_cardinality_limit.py b/opentelemetry-sdk/tests/metrics/test_metric_cardinality_limit.py new file mode 100644 index 00000000000..5b3d5d9c7ea --- /dev/null +++ b/opentelemetry-sdk/tests/metrics/test_metric_cardinality_limit.py @@ -0,0 +1,35 @@ +from logging import WARNING +from unittest import TestCase + +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import InMemoryMetricReader + + +class TestMetricCardinalityLimit(TestCase): + + def setUp(self): + self.reader = InMemoryMetricReader() + self.meter_provider = MeterProvider(metric_readers=[self.reader]) + self.meter = self.meter_provider.get_meter("test_meter") + + def test_metric_cardinality_limit(self): + # Assuming a Counter type metric + counter = self.meter.create_counter("cardinality_test_counter") + + # Generate and add more than 2000 unique labels + for ind in range(2100): + label = {"key": f"value_{ind}"} + counter.add(1, label) + + # Simulate an export to get the metrics into the in-memory exporter + self.reader.force_flush() + + # Retrieve the metrics from the in-memory exporter + metric_data = self.reader.get_metrics_data() + + # Check if the length of the metric data doesn't exceed 2000 + self.assertTrue(len(metric_data.resource_metrics) <= 2000) + + # Check if a warning or an error was logged + with self.assertLogs(level=WARNING): + counter.add(1, {"key": "value_2101"}) diff --git a/scripts/add_required_checks.py b/scripts/add_required_checks.py index bc58883b2d3..0604f940a03 100644 --- a/scripts/add_required_checks.py +++ b/scripts/add_required_checks.py @@ -1,9 +1,10 @@ # This script is to be used by maintainers by running it locally. -from requests import put +from json import dumps from os import environ + +from requests import put from yaml import safe_load -from json import dumps job_names = ["EasyCLA"] diff --git a/scripts/eachdist.py b/scripts/eachdist.py index 5a2b7603899..e4175b48425 100755 --- a/scripts/eachdist.py +++ b/scripts/eachdist.py @@ -7,13 +7,14 @@ import shutil import subprocess import sys -from toml import load from configparser import ConfigParser from inspect import cleandoc from itertools import chain from os.path import basename from pathlib import Path, PurePath +from toml import load + DEFAULT_ALLSEP = " " DEFAULT_ALLFMT = "{rel}"