Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor metric format #2658

Merged
merged 16 commits into from
May 12, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from logging import getLogger
from os import environ
from typing import Optional, Sequence
from grpc import ChannelCredentials, Compression
Expand Down Expand Up @@ -40,9 +40,10 @@
from opentelemetry.sdk._metrics.export import (
MetricExporter,
MetricExportResult,
MetricsData,
)

logger = logging.getLogger(__name__)
_logger = getLogger(__name__)


class OTLPMetricExporter(
Expand Down Expand Up @@ -79,90 +80,124 @@ def __init__(
)

def _translate_data(
self, data: Sequence[Metric]
self, data: MetricsData
) -> ExportMetricsServiceRequest:
sdk_resource_scope_metrics = {}

for metric in data:
resource = metric.resource
scope_map = sdk_resource_scope_metrics.get(resource, {})
if not scope_map:
sdk_resource_scope_metrics[resource] = scope_map
resource_instrumentation_scope_pb2_scope_metrics = {}
ocelotl marked this conversation as resolved.
Show resolved Hide resolved

scope_metrics = scope_map.get(metric.instrumentation_scope)
for resource_metrics in data.resource_metrics:

if not scope_metrics:
if metric.instrumentation_scope is not None:
scope_map[metric.instrumentation_scope] = pb2.ScopeMetrics(
resource = resource_metrics.resource

instrumentation_scope_pb2_scope_metrics = (
ocelotl marked this conversation as resolved.
Show resolved Hide resolved
resource_instrumentation_scope_pb2_scope_metrics.get(
resource, {}
ocelotl marked this conversation as resolved.
Show resolved Hide resolved
)
)

if not instrumentation_scope_pb2_scope_metrics:
resource_instrumentation_scope_pb2_scope_metrics[
resource
] = instrumentation_scope_pb2_scope_metrics

for scope_metrics in resource_metrics.scope_metrics:

instrumentation_scope = scope_metrics.scope

if instrumentation_scope is None:
instrumentation_scope_pb2_scope_metrics[
None
] = pb2.ScopeMetrics()

else:
instrumentation_scope_pb2_scope_metrics[
instrumentation_scope
] = pb2.ScopeMetrics(
ocelotl marked this conversation as resolved.
Show resolved Hide resolved
scope=InstrumentationScope(
name=metric.instrumentation_scope.name,
version=metric.instrumentation_scope.version,
name=instrumentation_scope.name,
version=instrumentation_scope.version,
)
)
else:
scope_map[
metric.instrumentation_scope
] = pb2.ScopeMetrics()

scope_metrics = scope_map.get(metric.instrumentation_scope)
pb2_scope_metrics = instrumentation_scope_pb2_scope_metrics[
instrumentation_scope
]

for metric in scope_metrics.metrics:
pb2_metric = pb2.Metric(
name=metric.name,
description=metric.description,
unit=metric.unit,
)

for data_point in metric.data.data_points:

if isinstance(metric.data, Gauge):
ocelotl marked this conversation as resolved.
Show resolved Hide resolved
pt = pb2.NumberDataPoint(
attributes=self._translate_attributes(
data_point.attributes
),
time_unix_nano=data_point.time_unix_nano,
)
if isinstance(data_point.value, int):
pt.as_int = data_point.value
else:
pt.as_double = data_point.value
pb2_metric.gauge.data_points.append(pt)

elif isinstance(metric.data, Histogram):
pt = pb2.HistogramDataPoint(
attributes=self._translate_attributes(
data_point.attributes
),
time_unix_nano=data_point.time_unix_nano,
start_time_unix_nano=(
data_point.start_time_unix_nano
),
count=data_point.count,
sum=data_point.sum,
bucket_counts=data_point.bucket_counts,
explicit_bounds=data_point.explicit_bounds,
)
pb2_metric.histogram.aggregation_temporality = (
metric.data.aggregation_temporality
)
pb2_metric.histogram.data_points.append(pt)
elif isinstance(metric.data, Sum):
pt = pb2.NumberDataPoint(
attributes=self._translate_attributes(
data_point.attributes
),
start_time_unix_nano=(
data_point.start_time_unix_nano
),
time_unix_nano=data_point.time_unix_nano,
)
if isinstance(data_point.value, int):
pt.as_int = data_point.value
else:
pt.as_double = data_point.value
# note that because sum is a message type, the
# fields must be set individually rather than
# instantiating a pb2.Sum and setting it once
pb2_metric.sum.aggregation_temporality = (
metric.data.aggregation_temporality
)
pb2_metric.sum.is_monotonic = (
metric.data.is_monotonic
)
pb2_metric.sum.data_points.append(pt)
else:
_logger.warn(
"unsupported datapoint type %s", metric.point
)
continue

pb2_scope_metrics.metrics.append(pb2_metric)

pbmetric = pb2.Metric(
name=metric.name,
description=metric.description,
unit=metric.unit,
)
if isinstance(metric.point, Gauge):
pt = pb2.NumberDataPoint(
attributes=self._translate_attributes(metric.attributes),
time_unix_nano=metric.point.time_unix_nano,
)
if isinstance(metric.point.value, int):
pt.as_int = metric.point.value
else:
pt.as_double = metric.point.value
pbmetric.gauge.data_points.append(pt)
elif isinstance(metric.point, Histogram):
pt = pb2.HistogramDataPoint(
attributes=self._translate_attributes(metric.attributes),
time_unix_nano=metric.point.time_unix_nano,
start_time_unix_nano=metric.point.start_time_unix_nano,
count=sum(metric.point.bucket_counts),
sum=metric.point.sum,
bucket_counts=metric.point.bucket_counts,
explicit_bounds=metric.point.explicit_bounds,
)
pbmetric.histogram.aggregation_temporality = (
metric.point.aggregation_temporality
)
pbmetric.histogram.data_points.append(pt)
elif isinstance(metric.point, Sum):
pt = pb2.NumberDataPoint(
attributes=self._translate_attributes(metric.attributes),
start_time_unix_nano=metric.point.start_time_unix_nano,
time_unix_nano=metric.point.time_unix_nano,
)
if isinstance(metric.point.value, int):
pt.as_int = metric.point.value
else:
pt.as_double = metric.point.value
# note that because sum is a message type, the fields must be
# set individually rather than instantiating a pb2.Sum and setting
# it once
pbmetric.sum.aggregation_temporality = (
metric.point.aggregation_temporality
)
pbmetric.sum.is_monotonic = metric.point.is_monotonic
pbmetric.sum.data_points.append(pt)
else:
logger.warn("unsupported datapoint type %s", metric.point)
continue

scope_metrics.metrics.append(
pbmetric,
)
return ExportMetricsServiceRequest(
resource_metrics=get_resource_data(
sdk_resource_scope_metrics,
resource_instrumentation_scope_pb2_scope_metrics,
pb2.ResourceMetrics,
"metrics",
)
Expand Down
Loading