Skip to content

Commit

Permalink
Pass default temporality to constructor
Browse files Browse the repository at this point in the history
  • Loading branch information
ocelotl committed May 8, 2022
1 parent 5c2f5a2 commit bfebe11
Show file tree
Hide file tree
Showing 9 changed files with 173 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def __init__(
view: "View",
instrument: "_Instrument",
sdk_config: SdkConfiguration,
instrument_class_temporality: Dict[type, AggregationTemporality],
instrument_class_aggregation: Dict[type, Aggregation],
):
self._view = view
Expand All @@ -52,6 +53,7 @@ def __init__(
self._attributes_aggregation: Dict[frozenset, _Aggregation] = {}
self._attributes_previous_point: Dict[frozenset, _PointVarT] = {}
self._lock = Lock()
self._instrument_class_temporality = instrument_class_temporality
self._instrument_class_aggregation = instrument_class_aggregation
self._name = self._view._name or self._instrument.name
self._description = (
Expand Down Expand Up @@ -123,9 +125,7 @@ def consume_measurement(self, measurement: Measurement) -> None:

self._attributes_aggregation[attributes].aggregate(measurement)

def collect(
self, instrument_class_temporality: Dict[type, AggregationTemporality]
) -> Iterable[Metric]:
def collect(self) -> Iterable[Metric]:

with self._lock:
for (
Expand Down Expand Up @@ -163,7 +163,7 @@ def collect(
point=_convert_aggregation_temporality(
previous_point,
current_point,
instrument_class_temporality[
self._instrument_class_temporality[
self._instrument.__class__
],
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from abc import ABC, abstractmethod
from threading import Lock
from typing import TYPE_CHECKING, Dict, Iterable, List, Mapping
from typing import TYPE_CHECKING, Iterable, List, Mapping

from opentelemetry._metrics._internal.instrument import CallbackOptions
from opentelemetry.sdk._metrics._internal.metric_reader_storage import (
Expand All @@ -25,7 +25,7 @@
)
from opentelemetry.sdk._metrics.measurement import Measurement
from opentelemetry.sdk._metrics.metric_reader import MetricReader
from opentelemetry.sdk._metrics.point import AggregationTemporality, Metric
from opentelemetry.sdk._metrics.point import Metric

if TYPE_CHECKING:
from opentelemetry.sdk._metrics._internal.instrument import _Asynchronous
Expand All @@ -44,7 +44,6 @@ def register_asynchronous_instrument(self, instrument: "_Asynchronous"):
def collect(
self,
metric_reader: MetricReader,
instrument_type_temporality: Dict[type, AggregationTemporality],
) -> Iterable[Metric]:
pass

Expand All @@ -56,7 +55,9 @@ def __init__(self, sdk_config: SdkConfiguration) -> None:
# should never be mutated
self._reader_storages: Mapping[MetricReader, MetricReaderStorage] = {
reader: MetricReaderStorage(
sdk_config, reader._instrument_class_aggregation
sdk_config,
reader._instrument_class_temporality,
reader._instrument_class_aggregation
)
for reader in sdk_config.metric_readers
}
Expand All @@ -75,7 +76,6 @@ def register_asynchronous_instrument(
def collect(
self,
metric_reader: MetricReader,
instrument_type_temporality: Dict[type, AggregationTemporality],
) -> Iterable[Metric]:
with self._lock:
metric_reader_storage = self._reader_storages[metric_reader]
Expand All @@ -84,6 +84,4 @@ def collect(
for async_instrument in self._async_instruments:
for measurement in async_instrument.callback(callback_options):
metric_reader_storage.consume_measurement(measurement)
return self._reader_storages[metric_reader].collect(
instrument_type_temporality
)
return self._reader_storages[metric_reader].collect()
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ def __init__(
)

self._instrument_class_temporality.update(preferred_temporality or {})
self._preferred_temporality = preferred_temporality
self._instrument_class_aggregation = {
Counter: DefaultAggregation(),
UpDownCounter: DefaultAggregation(),
Expand All @@ -148,8 +147,7 @@ def collect(self, timeout_millis: float = 10_000) -> None:
)
return
self._receive_metrics(
self._collect(self, self._instrument_class_temporality),
timeout_millis=timeout_millis,
self._collect(self), timeout_millis=timeout_millis,
)

@final
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,15 @@ class MetricReaderStorage:
def __init__(
self,
sdk_config: SdkConfiguration,
instrument_class_temporality: Dict[type, AggregationTemporality],
instrument_class_aggregation: Dict[type, Aggregation],
) -> None:
self._lock = RLock()
self._sdk_config = sdk_config
self._instrument_view_instrument_matches: Dict[
Instrument, List[_ViewInstrumentMatch]
] = {}
self._instrument_class_temporality = instrument_class_temporality
self._instrument_class_aggregation = instrument_class_aggregation

def _get_or_init_view_instrument_match(
Expand Down Expand Up @@ -79,6 +81,9 @@ def _get_or_init_view_instrument_match(
view=_DEFAULT_VIEW,
instrument=instrument,
sdk_config=self._sdk_config,
instrument_class_temporality=(
self._instrument_class_temporality
),
instrument_class_aggregation=(
self._instrument_class_aggregation
),
Expand All @@ -96,9 +101,7 @@ def consume_measurement(self, measurement: Measurement) -> None:
):
view_instrument_match.consume_measurement(measurement)

def collect(
self, instrument_type_temporality: Dict[type, AggregationTemporality]
) -> Iterable[Metric]:
def collect(self) -> Iterable[Metric]:
# Use a list instead of yielding to prevent a slow reader from holding
# SDK locks
metrics: List[Metric] = []
Expand All @@ -116,11 +119,7 @@ def collect(
view_instrument_matches
) in self._instrument_view_instrument_matches.values():
for view_instrument_match in view_instrument_matches:
metrics.extend(
view_instrument_match.collect(
instrument_type_temporality
)
)
metrics.extend(view_instrument_match.collect())

return metrics

Expand All @@ -141,6 +140,9 @@ def _handle_view_instrument_match(
view=view,
instrument=instrument,
sdk_config=self._sdk_config,
instrument_class_temporality=(
self._instrument_class_temporality
),
instrument_class_aggregation=(
self._instrument_class_aggregation
),
Expand Down
9 changes: 3 additions & 6 deletions opentelemetry-sdk/tests/metrics/test_measurement_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from opentelemetry.sdk._metrics._internal.sdk_configuration import (
SdkConfiguration,
)
from opentelemetry.sdk._metrics.point import AggregationTemporality


@patch(
Expand Down Expand Up @@ -85,10 +84,8 @@ def test_collect_passed_to_reader_stage(self, MockMetricReaderStorage):
)
for r_mock, rs_mock in zip(reader_mocks, reader_storage_mocks):
rs_mock.collect.assert_not_called()
consumer.collect(r_mock, AggregationTemporality.CUMULATIVE)
rs_mock.collect.assert_called_once_with(
AggregationTemporality.CUMULATIVE
)
consumer.collect(r_mock)
rs_mock.collect.assert_called_once_with()

def test_collect_calls_async_instruments(self, MockMetricReaderStorage):
"""Its collect() method should invoke async instruments and pass measurements to the
Expand All @@ -108,7 +105,7 @@ def test_collect_calls_async_instruments(self, MockMetricReaderStorage):
i_mock.callback.return_value = [Mock()]
consumer.register_asynchronous_instrument(i_mock)

consumer.collect(reader_mock, AggregationTemporality.CUMULATIVE)
consumer.collect(reader_mock)

# it should call async instruments
for i_mock in async_instrument_mocks:
Expand Down
Loading

0 comments on commit bfebe11

Please sign in to comment.