Skip to content

Commit

Permalink
Refactor metric format (#2658)
Browse files Browse the repository at this point in the history
* Refactor metric format

Fixes #2646

* Do not overwrite pb2_scope_metrics

* Refactor for loops

* Add multiple scope test case

* Fix interfaces

* Fix docs

* Update exporter/opentelemetry-exporter-prometheus/src/opentelemetry/exporter/prometheus/__init__.py

Co-authored-by: Aaron Abbott <aaronabbott@google.com>

* Fix lint

* Remove resource check

* Remove instrumentation_scope check

* Group metrics by instrumentation scopes in the SDK

* Remove label_keyss

* Use strings instead of mocks

* Return generator instead of a list

* Fix lint

* Rename variables

Co-authored-by: Aaron Abbott <aaronabbott@google.com>
  • Loading branch information
ocelotl and aabmass authored May 12, 2022
1 parent a821311 commit e8fbb08
Show file tree
Hide file tree
Showing 21 changed files with 1,430 additions and 1,161 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from logging import getLogger
from os import environ
from typing import Optional, Sequence
from grpc import ChannelCredentials, Compression
Expand Down Expand Up @@ -40,9 +40,10 @@
from opentelemetry.sdk._metrics.export import (
MetricExporter,
MetricExportResult,
MetricsData,
)

logger = logging.getLogger(__name__)
_logger = getLogger(__name__)


class OTLPMetricExporter(
Expand Down Expand Up @@ -79,103 +80,127 @@ def __init__(
)

def _translate_data(
self, data: Sequence[Metric]
self, data: MetricsData
) -> ExportMetricsServiceRequest:
sdk_resource_scope_metrics = {}

for metric in data:
resource = metric.resource
scope_map = sdk_resource_scope_metrics.get(resource, {})
if not scope_map:
sdk_resource_scope_metrics[resource] = scope_map

scope_metrics = scope_map.get(metric.instrumentation_scope)

if not scope_metrics:
if metric.instrumentation_scope is not None:
scope_map[metric.instrumentation_scope] = pb2.ScopeMetrics(
scope=InstrumentationScope(
name=metric.instrumentation_scope.name,
version=metric.instrumentation_scope.version,
)
)
else:
scope_map[
metric.instrumentation_scope
] = pb2.ScopeMetrics()

scope_metrics = scope_map.get(metric.instrumentation_scope)
resource_metrics_dict = {}

pbmetric = pb2.Metric(
name=metric.name,
description=metric.description,
unit=metric.unit,
)
if isinstance(metric.point, Gauge):
pt = pb2.NumberDataPoint(
attributes=self._translate_attributes(metric.attributes),
time_unix_nano=metric.point.time_unix_nano,
)
if isinstance(metric.point.value, int):
pt.as_int = metric.point.value
else:
pt.as_double = metric.point.value
pbmetric.gauge.data_points.append(pt)
elif isinstance(metric.point, Histogram):
pt = pb2.HistogramDataPoint(
attributes=self._translate_attributes(metric.attributes),
time_unix_nano=metric.point.time_unix_nano,
start_time_unix_nano=metric.point.start_time_unix_nano,
count=sum(metric.point.bucket_counts),
sum=metric.point.sum,
bucket_counts=metric.point.bucket_counts,
explicit_bounds=metric.point.explicit_bounds,
)
pbmetric.histogram.aggregation_temporality = (
metric.point.aggregation_temporality
)
pbmetric.histogram.data_points.append(pt)
elif isinstance(metric.point, Sum):
pt = pb2.NumberDataPoint(
attributes=self._translate_attributes(metric.attributes),
start_time_unix_nano=metric.point.start_time_unix_nano,
time_unix_nano=metric.point.time_unix_nano,
)
if isinstance(metric.point.value, int):
pt.as_int = metric.point.value
else:
pt.as_double = metric.point.value
# note that because sum is a message type, the fields must be
# set individually rather than instantiating a pb2.Sum and setting
# it once
pbmetric.sum.aggregation_temporality = (
metric.point.aggregation_temporality
for resource_metrics in data.resource_metrics:

resource = resource_metrics.resource

# It is safe to assume that each entry in data.resource_metrics is
# associated with an unique resource.
scope_metrics_dict = {}

resource_metrics_dict[resource] = scope_metrics_dict

for scope_metrics in resource_metrics.scope_metrics:

instrumentation_scope = scope_metrics.scope

# The SDK groups metrics in instrumentation scopes already so
# there is no need to check for existing instrumentation scopes
# here.
pb2_scope_metrics = pb2.ScopeMetrics(
scope=InstrumentationScope(
name=instrumentation_scope.name,
version=instrumentation_scope.version,
)
)
pbmetric.sum.is_monotonic = metric.point.is_monotonic
pbmetric.sum.data_points.append(pt)
else:
logger.warn("unsupported datapoint type %s", metric.point)
continue

scope_metrics.metrics.append(
pbmetric,
)

scope_metrics_dict[instrumentation_scope] = pb2_scope_metrics

for metric in scope_metrics.metrics:
pb2_metric = pb2.Metric(
name=metric.name,
description=metric.description,
unit=metric.unit,
)

if isinstance(metric.data, Gauge):
for data_point in metric.data.data_points:
pt = pb2.NumberDataPoint(
attributes=self._translate_attributes(
data_point.attributes
),
time_unix_nano=data_point.time_unix_nano,
)
if isinstance(data_point.value, int):
pt.as_int = data_point.value
else:
pt.as_double = data_point.value
pb2_metric.gauge.data_points.append(pt)

elif isinstance(metric.data, Histogram):
for data_point in metric.data.data_points:
pt = pb2.HistogramDataPoint(
attributes=self._translate_attributes(
data_point.attributes
),
time_unix_nano=data_point.time_unix_nano,
start_time_unix_nano=(
data_point.start_time_unix_nano
),
count=data_point.count,
sum=data_point.sum,
bucket_counts=data_point.bucket_counts,
explicit_bounds=data_point.explicit_bounds,
)
pb2_metric.histogram.aggregation_temporality = (
metric.data.aggregation_temporality
)
pb2_metric.histogram.data_points.append(pt)

elif isinstance(metric.data, Sum):
for data_point in metric.data.data_points:
pt = pb2.NumberDataPoint(
attributes=self._translate_attributes(
data_point.attributes
),
start_time_unix_nano=(
data_point.start_time_unix_nano
),
time_unix_nano=data_point.time_unix_nano,
)
if isinstance(data_point.value, int):
pt.as_int = data_point.value
else:
pt.as_double = data_point.value
# note that because sum is a message type, the
# fields must be set individually rather than
# instantiating a pb2.Sum and setting it once
pb2_metric.sum.aggregation_temporality = (
metric.data.aggregation_temporality
)
pb2_metric.sum.is_monotonic = (
metric.data.is_monotonic
)
pb2_metric.sum.data_points.append(pt)
else:
_logger.warn(
"unsupported datapoint type %s", metric.point
)
continue

pb2_scope_metrics.metrics.append(pb2_metric)

return ExportMetricsServiceRequest(
resource_metrics=get_resource_data(
sdk_resource_scope_metrics,
resource_metrics_dict,
pb2.ResourceMetrics,
"metrics",
)
)

def export(
self,
metrics: Sequence[Metric],
metrics_data: MetricsData,
timeout_millis: float = 10_000,
**kwargs,
) -> MetricExportResult:
# TODO(#2663): OTLPExporterMixin should pass timeout to gRPC
return self._export(metrics)
return self._export(metrics_data)

def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
pass
Loading

0 comments on commit e8fbb08

Please sign in to comment.