From 428c2b64bba01d33b4e2f5a8486c76832f8dd9e2 Mon Sep 17 00:00:00 2001 From: Leighton Chen Date: Mon, 11 Mar 2024 13:31:32 -0700 Subject: [PATCH] Add live metrics collection of requests/dependencies/exceptions (#34673) --- .../CHANGELOG.md | 3 + .../README.md | 2 +- .../exporter/_quickpulse/_constants.py | 16 ++ .../exporter/_quickpulse/_exporter.py | 116 +++------- .../exporter/_quickpulse/_live_metrics.py | 107 ++++++++- .../exporter/_quickpulse/_processor.py | 33 +++ .../exporter/_quickpulse/_state.py | 69 ++++++ .../exporter/_quickpulse/_utils.py | 156 ++++++++++++++ .../monitor/opentelemetry/exporter/_utils.py | 2 +- .../tests/quickpulse/test_exporter.py | 14 +- .../tests/quickpulse/test_live_metrics.py | 129 +++++++++++ .../tests/quickpulse/test_processor.py | 44 ++++ .../tests/quickpulse/test_utils.py | 204 ++++++++++++++++++ 13 files changed, 799 insertions(+), 96 deletions(-) create mode 100644 sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_processor.py create mode 100644 sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_state.py create mode 100644 sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_utils.py create mode 100644 sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_processor.py create mode 100644 sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_utils.py diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md b/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md index 8ca743f46635..d589309891ac 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md @@ -4,6 +4,9 @@ ### Features Added +- Add live metrics collection of requests/dependencies/exceptions + ([#34673](https://github.com/Azure/azure-sdk-for-python/pull/34673)) + ### Breaking Changes ### Bugs Fixed diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/README.md b/sdk/monitor/azure-monitor-opentelemetry-exporter/README.md index 5be012c891d6..e5f360932f58 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/README.md +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/README.md @@ -1,6 +1,6 @@ # Microsoft OpenTelemetry exporter for Azure Monitor -The exporter for Azure Monitor allows Python applications to export data from the OpenTelemetry SDK to Azure Monitor. The exporter is intended for users who require advanced configuration or has more complicated telemetry needs that require all of distributed tracing, logging and metrics. If you have simpler configuration requirements, we recommend using the [Azure Monitor OpenTelemetry Distro](https://learn.microsoft.com/azure/azure-monitor/app/opentelemetry-enable?tabs=python) instead for a simpler one-line setup. +The exporter for Azure Monitor allows Python applications to export data from the OpenTelemetry SDK to Azure Monitor. The exporter is intended for users who require advanced configuration or have more complicated telemetry needs that require all of distributed tracing, logging and metrics. If you have simpler configuration requirements, we recommend using the [Azure Monitor OpenTelemetry Distro](https://learn.microsoft.com/azure/azure-monitor/app/opentelemetry-enable?tabs=python) instead for a simpler one-line setup. Prior to using this SDK, please read and understand [Data Collection Basics](https://learn.microsoft.com/azure/azure-monitor/app/opentelemetry-overview?tabs=python), especially the section on [telemetry types](https://learn.microsoft.com/azure/azure-monitor/app/opentelemetry-overview?tabs=python#telemetry-types). OpenTelemetry terminology differs from Application Insights terminology so it is important to understand the way the telemetry types map to each other. diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_constants.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_constants.py index b591258f2ee0..94155b73f32c 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_constants.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_constants.py @@ -1,5 +1,7 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. +from enum import Enum + # cSpell:disable # (OpenTelemetry metric name, Quickpulse metric name) @@ -33,4 +35,18 @@ ] ) +# Quickpulse intervals +_SHORT_PING_INTERVAL_SECONDS = 5 +_POST_INTERVAL_SECONDS = 1 +_LONG_PING_INTERVAL_SECONDS = 60 +_POST_CANCEL_INTERVAL_SECONDS = 20 + +# Live metrics data types +class _DocumentIngressDocumentType(Enum): + Request = "Request" + RemoteDependency = "RemoteDependency" + Exception = "Exception" + Event = "Event" + Trace = "Trace" + # cSpell:disable diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_exporter.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_exporter.py index ffcfb33312d0..630294218ee6 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_exporter.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_exporter.py @@ -1,8 +1,6 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. -from datetime import datetime, timezone -from enum import Enum -from typing import Any, List, Optional +from typing import Any, Optional from opentelemetry.context import ( _SUPPRESS_INSTRUMENTATION_KEY, @@ -13,16 +11,8 @@ from opentelemetry.sdk.metrics import ( Counter, Histogram, - ObservableCounter, - ObservableGauge, - ObservableUpDownCounter, - UpDownCounter, -) -from opentelemetry.sdk.metrics._internal.point import ( - NumberDataPoint, - HistogramDataPoint, - MetricsData, ) +from opentelemetry.sdk.metrics._internal.point import MetricsData from opentelemetry.sdk.metrics.export import ( AggregationTemporality, MetricExporter, @@ -32,31 +22,33 @@ ) from azure.core.exceptions import HttpResponseError -from azure.monitor.opentelemetry.exporter._quickpulse._constants import _QUICKPULSE_METRIC_NAME_MAPPINGS +from azure.monitor.opentelemetry.exporter._quickpulse._constants import ( + _LONG_PING_INTERVAL_SECONDS, + _POST_CANCEL_INTERVAL_SECONDS, + _POST_INTERVAL_SECONDS, +) from azure.monitor.opentelemetry.exporter._quickpulse._generated._client import QuickpulseClient -from azure.monitor.opentelemetry.exporter._quickpulse._generated.models import ( - DocumentIngress, - MetricPoint, - MonitoringDataPoint, +from azure.monitor.opentelemetry.exporter._quickpulse._generated.models import MonitoringDataPoint +from azure.monitor.opentelemetry.exporter._quickpulse._state import ( + _get_global_quickpulse_state, + _is_ping_state, + _set_global_quickpulse_state, + _get_and_clear_quickpulse_documents, + _QuickpulseState, +) +from azure.monitor.opentelemetry.exporter._quickpulse._utils import ( + _metric_to_quick_pulse_data_points, ) from azure.monitor.opentelemetry.exporter._connection_string_parser import ConnectionStringParser from azure.monitor.opentelemetry.exporter._utils import _ticks_since_dot_net_epoch, PeriodicTask -_APPLICATION_INSIGHTS_METRIC_TEMPORALITIES = { +_QUICKPULSE_METRIC_TEMPORALITIES = { + # Use DELTA temporalities because we want to reset the counts every collection interval Counter: AggregationTemporality.DELTA, Histogram: AggregationTemporality.DELTA, - ObservableCounter: AggregationTemporality.DELTA, - ObservableGauge: AggregationTemporality.CUMULATIVE, - ObservableUpDownCounter: AggregationTemporality.CUMULATIVE, - UpDownCounter: AggregationTemporality.CUMULATIVE, } -_SHORT_PING_INTERVAL_SECONDS = 5 -_POST_INTERVAL_SECONDS = 1 -_LONG_PING_INTERVAL_SECONDS = 60 -_POST_CANCEL_INTERVAL_SECONDS = 20 - class _Response: """Response that encapsulates pipeline response and response headers from @@ -91,7 +83,7 @@ def __init__(self, connection_string: Optional[str]) -> None: MetricExporter.__init__( self, - preferred_temporality=_APPLICATION_INSIGHTS_METRIC_TEMPORALITIES, # type: ignore + preferred_temporality=_QUICKPULSE_METRIC_TEMPORALITIES, # type: ignore ) def export( @@ -116,7 +108,7 @@ def export( data_points = _metric_to_quick_pulse_data_points( metrics_data, base_monitoring_data_point=base_monitoring_data_point, - documents=kwargs.get("documents"), + documents=_get_and_clear_quickpulse_documents(), ) token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)) @@ -190,16 +182,6 @@ def _ping(self, monitoring_data_point) -> Optional[_Response]: return ping_response -class _QuickpulseState(Enum): - """Current state of quickpulse service. - The numerical value represents the ping/post interval in ms for those states. - """ - - PING_SHORT = _SHORT_PING_INTERVAL_SECONDS - PING_LONG = _LONG_PING_INTERVAL_SECONDS - POST_SHORT = _POST_INTERVAL_SECONDS - - class _QuickpulseMetricReader(MetricReader): def __init__( @@ -208,7 +190,6 @@ def __init__( base_monitoring_data_point: MonitoringDataPoint, ) -> None: self._exporter = exporter - self._quick_pulse_state = _QuickpulseState.PING_SHORT self._base_monitoring_data_point = base_monitoring_data_point self._elapsed_num_seconds = 0 self._worker = PeriodicTask( @@ -224,9 +205,9 @@ def __init__( self._worker.start() def _ticker(self) -> None: - if self._is_ping_state(): + if _is_ping_state(): # Send a ping if elapsed number of request meets the threshold - if self._elapsed_num_seconds % int(self._quick_pulse_state.value) == 0: + if self._elapsed_num_seconds % _get_global_quickpulse_state().value == 0: print("pinging...") ping_response = self._exporter._ping( # pylint: disable=protected-access self._base_monitoring_data_point, @@ -236,22 +217,22 @@ def _ticker(self) -> None: if header and header == "true": print("ping succeeded: switching to post") # Switch state to post if subscribed - self._quick_pulse_state = _QuickpulseState.POST_SHORT + _set_global_quickpulse_state(_QuickpulseState.POST_SHORT) self._elapsed_num_seconds = 0 else: # Backoff after _LONG_PING_INTERVAL_SECONDS (60s) of no successful requests - if self._quick_pulse_state is _QuickpulseState.PING_SHORT and \ + if _get_global_quickpulse_state() is _QuickpulseState.PING_SHORT and \ self._elapsed_num_seconds >= _LONG_PING_INTERVAL_SECONDS: print("ping failed for 60s, switching to pinging every 60s") - self._quick_pulse_state = _QuickpulseState.PING_LONG + _set_global_quickpulse_state(_QuickpulseState.PING_LONG) # TODO: Implement redirect else: # Erroneous ping responses instigate backoff logic # Backoff after _LONG_PING_INTERVAL_SECONDS (60s) of no successful requests - if self._quick_pulse_state is _QuickpulseState.PING_SHORT and \ + if _get_global_quickpulse_state() is _QuickpulseState.PING_SHORT and \ self._elapsed_num_seconds >= _LONG_PING_INTERVAL_SECONDS: print("ping failed for 60s, switching to pinging every 60s") - self._quick_pulse_state = _QuickpulseState.PING_LONG + _set_global_quickpulse_state(_QuickpulseState.PING_LONG) else: print("posting...") try: @@ -262,7 +243,7 @@ def _ticker(self) -> None: # And resume pinging if self._elapsed_num_seconds >= _POST_CANCEL_INTERVAL_SECONDS: print("post failed for 20s, switching to pinging") - self._quick_pulse_state = _QuickpulseState.PING_SHORT + _set_global_quickpulse_state(_QuickpulseState.PING_SHORT) self._elapsed_num_seconds = 0 self._elapsed_num_seconds += 1 @@ -277,7 +258,6 @@ def _receive_metrics( metrics_data, timeout_millis=timeout_millis, base_monitoring_data_point=self._base_monitoring_data_point, - documents=[], ) if result is MetricExportResult.FAILURE: # There is currently no way to propagate unsuccessful metric post so @@ -288,41 +268,3 @@ def _receive_metrics( def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: self._worker.cancel() self._worker.join() - - def _is_ping_state(self): - return self._quick_pulse_state in (_QuickpulseState.PING_SHORT, _QuickpulseState.PING_LONG) - -def _metric_to_quick_pulse_data_points( # pylint: disable=too-many-nested-blocks - metrics_data: OTMetricsData, - base_monitoring_data_point: MonitoringDataPoint, - documents: Optional[List[DocumentIngress]], -) -> List[MonitoringDataPoint]: - metric_points = [] - for resource_metric in metrics_data.resource_metrics: - for scope_metric in resource_metric.scope_metrics: - for metric in scope_metric.metrics: - for point in metric.data.data_points: - if point is not None: - metric_point = MetricPoint( - name=_QUICKPULSE_METRIC_NAME_MAPPINGS[metric.name.lower()], - weight=1, - ) - if isinstance(point, HistogramDataPoint): - metric_point.value = point.sum - elif isinstance(point, NumberDataPoint): - metric_point.value = point.value - else: - metric_point.value = 0 - metric_points.append(metric_point) - return [ - MonitoringDataPoint( - version=base_monitoring_data_point.version, - instance=base_monitoring_data_point.instance, - role_name=base_monitoring_data_point.role_name, - machine_name=base_monitoring_data_point.machine_name, - stream_id=base_monitoring_data_point.stream_id, - timestamp=datetime.now(tz=timezone.utc), - metrics=metric_points, - documents=documents, - ) - ] diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py index 83bb3e073e3d..c84ae7b2a906 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py @@ -3,15 +3,39 @@ import platform from typing import Any, Optional +from opentelemetry.sdk._logs import LogData from opentelemetry.sdk.metrics import MeterProvider -from opentelemetry.sdk.trace.id_generator import RandomIdGenerator from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.trace import ReadableSpan +from opentelemetry.sdk.trace.id_generator import RandomIdGenerator +from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.trace import SpanKind + from azure.monitor.opentelemetry.exporter._generated.models import ContextTagKeys +from azure.monitor.opentelemetry.exporter._quickpulse._constants import ( + _DEPENDENCY_DURATION_NAME, + _DEPENDENCY_FAILURE_RATE_NAME, + _DEPENDENCY_RATE_NAME, + _EXCEPTION_RATE_NAME, + _REQUEST_DURATION_NAME, + _REQUEST_FAILURE_RATE_NAME, + _REQUEST_RATE_NAME, +) from azure.monitor.opentelemetry.exporter._quickpulse._exporter import ( _QuickpulseExporter, _QuickpulseMetricReader, ) from azure.monitor.opentelemetry.exporter._quickpulse._generated.models import MonitoringDataPoint +from azure.monitor.opentelemetry.exporter._quickpulse._state import ( + _QuickpulseState, + _is_post_state, + _append_quickpulse_document, + _set_global_quickpulse_state, +) +from azure.monitor.opentelemetry.exporter._quickpulse._utils import ( + _get_log_record_document, + _get_span_document, +) from azure.monitor.opentelemetry.exporter._utils import ( _get_sdk_version, _populate_part_a_fields, @@ -20,7 +44,7 @@ def enable_live_metrics(**kwargs: Any) -> None: - """Azure Monitor base exporter for OpenTelemetry. + """Live metrics entry point. :keyword str connection_string: The connection string used for your Application Insights resource. :keyword Resource resource: The OpenTelemetry Resource used for this Python application. @@ -29,9 +53,11 @@ def enable_live_metrics(**kwargs: Any) -> None: _QuickpulseManager(kwargs.get('connection_string'), kwargs.get('resource')) +# pylint: disable=protected-access,too-many-instance-attributes class _QuickpulseManager(metaclass=Singleton): def __init__(self, connection_string: Optional[str], resource: Optional[Resource]) -> None: + _set_global_quickpulse_state(_QuickpulseState.PING_SHORT) self._exporter = _QuickpulseExporter(connection_string) part_a_fields = {} if resource: @@ -47,3 +73,80 @@ def __init__(self, connection_string: Optional[str], resource: Optional[Resource ) self._reader = _QuickpulseMetricReader(self._exporter, self._base_monitoring_data_point) self._meter_provider = MeterProvider([self._reader]) + self._meter = self._meter_provider.get_meter("azure_monitor_live_metrics") + + self._request_duration = self._meter.create_histogram( + _REQUEST_DURATION_NAME[0], + "ms", + "live metrics avg request duration in ms" + ) + self._dependency_duration = self._meter.create_histogram( + _DEPENDENCY_DURATION_NAME[0], + "ms", + "live metrics avg dependency duration in ms" + ) + # We use a counter to represent rates per second because collection + # interval is one second so we simply need the number of requests + # within the collection interval + self._request_rate_counter = self._meter.create_counter( + _REQUEST_RATE_NAME[0], + "req/sec", + "live metrics request rate per second" + ) + self._request_failed_rate_counter = self._meter.create_counter( + _REQUEST_FAILURE_RATE_NAME[0], + "req/sec", + "live metrics request failed rate per second" + ) + self._dependency_rate_counter = self._meter.create_counter( + _DEPENDENCY_RATE_NAME[0], + "dep/sec", + "live metrics dependency rate per second" + ) + self._dependency_failure_rate_counter = self._meter.create_counter( + _DEPENDENCY_FAILURE_RATE_NAME[0], + "dep/sec", + "live metrics dependency failure rate per second" + ) + self._exception_rate_counter = self._meter.create_counter( + _EXCEPTION_RATE_NAME[0], + "exc/sec", + "live metrics exception rate per second" + ) + + def _record_span(self, span: ReadableSpan) -> None: + # Only record if in post state + if _is_post_state(): + document = _get_span_document(span) + _append_quickpulse_document(document) + duration_ms = 0 + if span.end_time and span.start_time: + duration_ms = (span.end_time - span.start_time) / 1e9 # type: ignore + # TODO: Spec out what "success" is + success = span.status.is_ok + + if span.kind in (SpanKind.SERVER, SpanKind.CONSUMER): + if success: + self._request_rate_counter.add(1) + else: + self._request_failed_rate_counter.add(1) + self._request_duration.record(duration_ms) + else: + if success: + self._dependency_rate_counter.add(1) + else: + self._dependency_failure_rate_counter.add(1) + self._dependency_duration.record(duration_ms) + + def _record_log_record(self, log_data: LogData) -> None: + # Only record if in post state + if _is_post_state(): + if log_data.log_record: + log_record = log_data.log_record + if log_record.attributes: + document = _get_log_record_document(log_data) + _append_quickpulse_document(document) + exc_type = log_record.attributes.get(SpanAttributes.EXCEPTION_TYPE) + exc_message = log_record.attributes.get(SpanAttributes.EXCEPTION_MESSAGE) + if exc_type is not None or exc_message is not None: + self._exception_rate_counter.add(1) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_processor.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_processor.py new file mode 100644 index 000000000000..2e18253976fb --- /dev/null +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_processor.py @@ -0,0 +1,33 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from opentelemetry.sdk._logs import LogData, LogRecordProcessor +from opentelemetry.sdk.trace import ReadableSpan, SpanProcessor + +from azure.monitor.opentelemetry.exporter._quickpulse._live_metrics import _QuickpulseManager + + +# pylint: disable=protected-access +class _QuickpulseLogRecordProcessor(LogRecordProcessor): + + def emit(self, log_data: LogData) -> None: + qpm = _QuickpulseManager._instance + if qpm: + qpm._record_log_record(log_data) + super().emit(log_data) + + def shutdown(self): + pass + + def force_flush(self, timeout_millis: int = 30000): + super().force_flush(timeout_millis=timeout_millis) + + +# pylint: disable=protected-access +class _QuickpulseSpanProcessor(SpanProcessor): + + def on_end(self, span: ReadableSpan) -> None: + qpm = _QuickpulseManager._instance + if qpm: + qpm._record_span(span) + return super().on_end(span) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_state.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_state.py new file mode 100644 index 000000000000..05c78c72ebb5 --- /dev/null +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_state.py @@ -0,0 +1,69 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +from enum import Enum +from typing import List + +from azure.monitor.opentelemetry.exporter._quickpulse._constants import ( + _LONG_PING_INTERVAL_SECONDS, + _POST_INTERVAL_SECONDS, + _SHORT_PING_INTERVAL_SECONDS, +) +from azure.monitor.opentelemetry.exporter._quickpulse._generated.models import DocumentIngress + + +class _QuickpulseState(Enum): + """Current state of quickpulse service. + The numerical value represents the ping/post interval in ms for those states. + """ + OFFLINE = 0 + PING_SHORT = _SHORT_PING_INTERVAL_SECONDS + PING_LONG = _LONG_PING_INTERVAL_SECONDS + POST_SHORT = _POST_INTERVAL_SECONDS + + +_GLOBAL_QUICKPULSE_STATE = _QuickpulseState.OFFLINE +_QUICKPULSE_DOCUMENTS: List[DocumentIngress] = [] + +def _set_global_quickpulse_state(state: _QuickpulseState): + # pylint: disable=global-statement + global _GLOBAL_QUICKPULSE_STATE + _GLOBAL_QUICKPULSE_STATE = state + + +def _get_global_quickpulse_state() -> _QuickpulseState: + return _GLOBAL_QUICKPULSE_STATE + + +def is_quickpulse_enabled() -> bool: + return _get_global_quickpulse_state() is not _QuickpulseState.OFFLINE + + +def _is_ping_state() -> bool: + return _get_global_quickpulse_state() in ( + _QuickpulseState.PING_SHORT, + _QuickpulseState.PING_LONG + ) + + +def _is_post_state(): + return _get_global_quickpulse_state() is _QuickpulseState.POST_SHORT + + +def _append_quickpulse_document(document: DocumentIngress): + # pylint: disable=global-statement,global-variable-not-assigned + global _QUICKPULSE_DOCUMENTS + # Limit risk of memory leak by limiting doc length to something manageable + if len(_QUICKPULSE_DOCUMENTS) > 20: + try: + _QUICKPULSE_DOCUMENTS.pop(0) + except IndexError: + pass + _QUICKPULSE_DOCUMENTS.append(document) + + +def _get_and_clear_quickpulse_documents() -> List[DocumentIngress]: + # pylint: disable=global-statement + global _QUICKPULSE_DOCUMENTS + documents = list(_QUICKPULSE_DOCUMENTS) + _QUICKPULSE_DOCUMENTS = [] + return documents diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_utils.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_utils.py new file mode 100644 index 000000000000..204b47d2ed01 --- /dev/null +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_utils.py @@ -0,0 +1,156 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +from datetime import datetime, timedelta, timezone +from typing import List, Optional, Union + +from opentelemetry.sdk._logs import LogData +from opentelemetry.sdk.metrics._internal.point import ( + NumberDataPoint, + HistogramDataPoint, +) +from opentelemetry.sdk.metrics.export import MetricsData as OTMetricsData +from opentelemetry.sdk.trace import ReadableSpan +from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.trace import SpanKind +from opentelemetry.util.types import Attributes + +from azure.monitor.opentelemetry.exporter._quickpulse._constants import ( + _DocumentIngressDocumentType, + _QUICKPULSE_METRIC_NAME_MAPPINGS, +) +from azure.monitor.opentelemetry.exporter._quickpulse._generated.models import ( + DocumentIngress, + Exception as ExceptionDocument, + MetricPoint, + MonitoringDataPoint, + RemoteDependency as RemoteDependencyDocument, + Request as RequestDocument, + Trace as TraceDocument, +) +def _metric_to_quick_pulse_data_points( # pylint: disable=too-many-nested-blocks + metrics_data: OTMetricsData, + base_monitoring_data_point: MonitoringDataPoint, + documents: Optional[List[DocumentIngress]], +) -> List[MonitoringDataPoint]: + metric_points = [] + for resource_metric in metrics_data.resource_metrics: + for scope_metric in resource_metric.scope_metrics: + for metric in scope_metric.metrics: + for point in metric.data.data_points: + if point is not None: + metric_point = MetricPoint( + name=_QUICKPULSE_METRIC_NAME_MAPPINGS[metric.name.lower()], + weight=1, + ) + if isinstance(point, HistogramDataPoint): + if point.count > 0: + metric_point.value = point.sum / point.count + else: + metric_point.value = 0 + elif isinstance(point, NumberDataPoint): + metric_point.value = point.value + else: + metric_point.value = 0 + metric_points.append(metric_point) + return [ + MonitoringDataPoint( + version=base_monitoring_data_point.version, + instance=base_monitoring_data_point.instance, + role_name=base_monitoring_data_point.role_name, + machine_name=base_monitoring_data_point.machine_name, + stream_id=base_monitoring_data_point.stream_id, + timestamp=datetime.now(tz=timezone.utc), + metrics=metric_points, + documents=documents, + ) + ] + +# mypy: disable-error-code="assignment,union-attr" +def _get_span_document(span: ReadableSpan) -> Union[RemoteDependencyDocument, RequestDocument]: + duration = 0 + if span.end_time and span.start_time: + duration = span.end_time - span.start_time + status_code = span.attributes.get(SpanAttributes.HTTP_STATUS_CODE, "") # type: ignore + grpc_status_code = span.attributes.get(SpanAttributes.RPC_GRPC_STATUS_CODE, "") # type: ignore + span_kind = span.kind + url = _get_url(span_kind, span.attributes) + if span_kind in (SpanKind.CLIENT, SpanKind.PRODUCER, SpanKind.INTERNAL): + document = RemoteDependencyDocument( + document_type=_DocumentIngressDocumentType.RemoteDependency.value, + name=span.name, + command_name=url, + result_code=str(status_code), + duration=_ns_to_iso8601_string(duration), + ) + else: + if status_code: + code = str(status_code) + else: + code = str(grpc_status_code) + document = RequestDocument( + document_type=_DocumentIngressDocumentType.Request.value, + name=span.name, + url=url, + response_code=code, + duration=_ns_to_iso8601_string(duration), + ) + return document + +# mypy: disable-error-code="assignment" +def _get_log_record_document(log_data: LogData) -> Union[ExceptionDocument, TraceDocument]: + exc_type = log_data.log_record.attributes.get(SpanAttributes.EXCEPTION_TYPE) # type: ignore + exc_message = log_data.log_record.attributes.get(SpanAttributes.EXCEPTION_MESSAGE) # type: ignore + if exc_type is not None or exc_message is not None: + document = ExceptionDocument( + document_type=_DocumentIngressDocumentType.Exception.value, + exception_type=str(exc_type), + exception_message=str(exc_message), + ) + else: + document = TraceDocument( + document_type=_DocumentIngressDocumentType.Trace.value, + message=log_data.log_record.body, + ) + return document + + +# mypy: disable-error-code="assignment" +# pylint: disable=no-else-return +def _get_url(span_kind: SpanKind, attributes: Attributes) -> str: + if not attributes: + return "" + http_method = attributes.get(SpanAttributes.HTTP_METHOD) + if http_method: + http_scheme = attributes.get(SpanAttributes.HTTP_SCHEME) + # Client + if span_kind in (SpanKind.CLIENT, SpanKind.PRODUCER): + http_url = attributes.get(SpanAttributes.HTTP_URL) + if http_url: + return str(http_url) + + host = attributes.get(SpanAttributes.NET_PEER_NAME) + port = attributes.get(SpanAttributes.NET_PEER_PORT, "") + ip = attributes.get(SpanAttributes.NET_PEER_IP) + if http_scheme: + if host: + return f"{http_scheme}://{host}:{port}" + else: + return f"{http_scheme}://{ip}:{port}" + else: # Server + host = attributes.get(SpanAttributes.NET_HOST_NAME) + port = attributes.get(SpanAttributes.NET_HOST_PORT) + http_target = attributes.get(SpanAttributes.HTTP_TARGET, "") + if http_scheme and host: + http_host = attributes.get(SpanAttributes.HTTP_HOST) + if http_host: + return f"{http_scheme}://{http_host}:{port}{http_target}" + return "" + + +def _ns_to_iso8601_string(nanoseconds: int) -> str: + seconds, nanoseconds_remainder = divmod(nanoseconds, 1e9) + microseconds = nanoseconds_remainder // 1000 # Convert nanoseconds to microseconds + dt = datetime.utcfromtimestamp(seconds) + dt_microseconds = timedelta(microseconds=microseconds) + dt_with_microseconds = dt + dt_microseconds + return dt_with_microseconds.isoformat() diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_utils.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_utils.py index a1510d736290..9ef51c6202f5 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_utils.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_utils.py @@ -116,7 +116,7 @@ def _getlocale(): } -def ns_to_duration(nanoseconds: int): +def ns_to_duration(nanoseconds: int) -> str: value = (nanoseconds + 500000) // 1000000 # duration in milliseconds value, microseconds = divmod(value, 1000) value, seconds = divmod(value, 60) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_exporter.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_exporter.py index d053b3da4e59..18f40e87c303 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_exporter.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_exporter.py @@ -27,6 +27,11 @@ _Response, _UnsuccessfulQuickPulsePostError, ) +from azure.monitor.opentelemetry.exporter._quickpulse._state import ( + _get_global_quickpulse_state, + _set_global_quickpulse_state, + _QuickpulseState, +) def throw(exc_type, *args, **kwargs): @@ -39,6 +44,7 @@ def func(*_args, **_kwargs): class TestQuickpulse(unittest.TestCase): @classmethod def setUpClass(cls): + _set_global_quickpulse_state(_QuickpulseState.PING_SHORT) cls._resource = Resource.create( { ResourceAttributes.SERVICE_INSTANCE_ID: "test_instance", @@ -209,10 +215,9 @@ def test_quickpulsereader_init(self, task_mock): self._data_point, ) self.assertEqual(reader._exporter, self._exporter) - self.assertEqual(reader._quick_pulse_state, _QuickpulseState.PING_SHORT) + self.assertEqual(_get_global_quickpulse_state(), _QuickpulseState.PING_SHORT) self.assertEqual(reader._base_monitoring_data_point, self._data_point) self.assertEqual(reader._elapsed_num_seconds, 0) - self.assertEqual(reader._elapsed_num_seconds, 0) self.assertEqual(reader._worker, task_inst_mock) task_mock.assert_called_with( interval=_POST_INTERVAL_SECONDS, @@ -231,7 +236,7 @@ def test_quickpulsereader_ticker_ping_true(self, task_mock, ping_mock): self._exporter, self._data_point, ) - reader._quick_pulse_state = _QuickpulseState.PING_SHORT + _set_global_quickpulse_state(_QuickpulseState.PING_SHORT) reader._elapsed_num_seconds = _QuickpulseState.PING_SHORT.value ping_mock.return_value = _Response( None, @@ -244,7 +249,7 @@ def test_quickpulsereader_ticker_ping_true(self, task_mock, ping_mock): ping_mock.assert_called_once_with( self._data_point, ) - self.assertEqual(reader._quick_pulse_state, _QuickpulseState.POST_SHORT) + self.assertEqual(_get_global_quickpulse_state(), _QuickpulseState.POST_SHORT) self.assertEqual(reader._elapsed_num_seconds, 1) # TODO: Other ticker cases @@ -267,7 +272,6 @@ def test_quickpulsereader_receive_metrics(self, task_mock, export_mock): self._metrics_data, timeout_millis=20_000, base_monitoring_data_point=self._data_point, - documents=[], ) @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._exporter._QuickpulseExporter.export") diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_live_metrics.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_live_metrics.py index 0b0f512865bf..f1699bd6beb7 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_live_metrics.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_live_metrics.py @@ -7,6 +7,8 @@ from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.resources import Resource, ResourceAttributes +from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.trace import SpanKind from azure.monitor.opentelemetry.exporter._generated.models import ContextTagKeys from azure.monitor.opentelemetry.exporter._quickpulse._exporter import ( @@ -17,6 +19,11 @@ enable_live_metrics, _QuickpulseManager, ) +from azure.monitor.opentelemetry.exporter._quickpulse._state import ( + _get_global_quickpulse_state, + _set_global_quickpulse_state, + _QuickpulseState, +) from azure.monitor.opentelemetry.exporter._utils import ( _get_sdk_version, _populate_part_a_fields, @@ -36,6 +43,13 @@ def test_enable_live_metrics(self, manager_mock): class TestQuickpulseManager(unittest.TestCase): + @classmethod + def setUpClass(cls): + _set_global_quickpulse_state(_QuickpulseState.PING_SHORT) + + @classmethod + def tearDownClass(cls): + _set_global_quickpulse_state(_QuickpulseState.OFFLINE) @mock.patch("opentelemetry.sdk.trace.id_generator.RandomIdGenerator.generate_trace_id") def test_init(self, generator_mock): @@ -51,6 +65,7 @@ def test_init(self, generator_mock): connection_string="InstrumentationKey=4321abcd-5678-4efa-8abc-1234567890ac;LiveEndpoint=https://eastus.livediagnostics.monitor.azure.com/", resource=resource, ) + self.assertEqual(_get_global_quickpulse_state(), _QuickpulseState.PING_SHORT) self.assertTrue(isinstance(qpm._exporter, _QuickpulseExporter)) self.assertEqual( qpm._exporter._live_endpoint, @@ -118,3 +133,117 @@ def test_singleton(self): qpm2._base_monitoring_data_point.role_name, part_a_fields.get(ContextTagKeys.AI_CLOUD_ROLE, "") ) + + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._live_metrics._append_quickpulse_document") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._live_metrics._get_span_document") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._live_metrics._is_post_state") + def test_record_span_server_success(self, post_state_mock, span_doc_mock, append_doc_mock): + post_state_mock.return_value = True + span_doc = mock.Mock() + span_doc_mock.return_value = span_doc + span_mock = mock.Mock() + span_mock.end_time = 10 + span_mock.start_time = 5 + span_mock.status.is_ok = True + span_mock.kind = SpanKind.SERVER + qpm = _QuickpulseManager( + connection_string="InstrumentationKey=4321abcd-5678-4efa-8abc-1234567890ac;LiveEndpoint=https://eastus.livediagnostics.monitor.azure.com/", + resource=Resource.create(), + ) + qpm._request_rate_counter = mock.Mock() + qpm._request_duration = mock.Mock() + qpm._record_span(span_mock) + append_doc_mock.assert_called_once_with(span_doc) + qpm._request_rate_counter.add.assert_called_once_with(1) + qpm._request_duration.record.assert_called_once_with(5 / 1e9) + + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._live_metrics._append_quickpulse_document") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._live_metrics._get_span_document") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._live_metrics._is_post_state") + def test_record_span_server_failure(self, post_state_mock, span_doc_mock, append_doc_mock): + post_state_mock.return_value = True + span_doc = mock.Mock() + span_doc_mock.return_value = span_doc + span_mock = mock.Mock() + span_mock.end_time = 10 + span_mock.start_time = 5 + span_mock.status.is_ok = False + span_mock.kind = SpanKind.SERVER + qpm = _QuickpulseManager( + connection_string="InstrumentationKey=4321abcd-5678-4efa-8abc-1234567890ac;LiveEndpoint=https://eastus.livediagnostics.monitor.azure.com/", + resource=Resource.create(), + ) + qpm._request_failed_rate_counter = mock.Mock() + qpm._request_duration = mock.Mock() + qpm._record_span(span_mock) + append_doc_mock.assert_called_once_with(span_doc) + qpm._request_failed_rate_counter.add.assert_called_once_with(1) + qpm._request_duration.record.assert_called_once_with(5 / 1e9) + + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._live_metrics._append_quickpulse_document") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._live_metrics._get_span_document") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._live_metrics._is_post_state") + def test_record_span_dep_success(self, post_state_mock, span_doc_mock, append_doc_mock): + post_state_mock.return_value = True + span_doc = mock.Mock() + span_doc_mock.return_value = span_doc + span_mock = mock.Mock() + span_mock.end_time = 10 + span_mock.start_time = 5 + span_mock.status.is_ok = True + span_mock.kind = SpanKind.CLIENT + qpm = _QuickpulseManager( + connection_string="InstrumentationKey=4321abcd-5678-4efa-8abc-1234567890ac;LiveEndpoint=https://eastus.livediagnostics.monitor.azure.com/", + resource=Resource.create(), + ) + qpm._dependency_rate_counter = mock.Mock() + qpm._dependency_duration = mock.Mock() + qpm._record_span(span_mock) + append_doc_mock.assert_called_once_with(span_doc) + qpm._dependency_rate_counter.add.assert_called_once_with(1) + qpm._dependency_duration.record.assert_called_once_with(5 / 1e9) + + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._live_metrics._append_quickpulse_document") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._live_metrics._get_span_document") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._live_metrics._is_post_state") + def test_record_span_dep_failure(self, post_state_mock, span_doc_mock, append_doc_mock): + post_state_mock.return_value = True + span_doc = mock.Mock() + span_doc_mock.return_value = span_doc + span_mock = mock.Mock() + span_mock.end_time = 10 + span_mock.start_time = 5 + span_mock.status.is_ok = False + span_mock.kind = SpanKind.CLIENT + qpm = _QuickpulseManager( + connection_string="InstrumentationKey=4321abcd-5678-4efa-8abc-1234567890ac;LiveEndpoint=https://eastus.livediagnostics.monitor.azure.com/", + resource=Resource.create(), + ) + qpm._dependency_failure_rate_counter = mock.Mock() + qpm._dependency_duration = mock.Mock() + qpm._record_span(span_mock) + append_doc_mock.assert_called_once_with(span_doc) + qpm._dependency_failure_rate_counter.add.assert_called_once_with(1) + qpm._dependency_duration.record.assert_called_once_with(5 / 1e9) + + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._live_metrics._append_quickpulse_document") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._live_metrics._get_log_record_document") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._live_metrics._is_post_state") + def test_record_log_exception(self, post_state_mock, log_doc_mock, append_doc_mock): + post_state_mock.return_value = True + log_record_doc = mock.Mock() + log_doc_mock.return_value = log_record_doc + log_data_mock = mock.Mock() + attributes = { + SpanAttributes.EXCEPTION_TYPE: "exc_type", + SpanAttributes.EXCEPTION_MESSAGE: "exc_msg", + } + log_data_mock.log_record.attributes = attributes + qpm = _QuickpulseManager( + connection_string="InstrumentationKey=4321abcd-5678-4efa-8abc-1234567890ac;LiveEndpoint=https://eastus.livediagnostics.monitor.azure.com/", + resource=Resource.create(), + ) + qpm._exception_rate_counter = mock.Mock() + qpm._record_log_record(log_data_mock) + append_doc_mock.assert_called_once_with(log_record_doc) + qpm._exception_rate_counter.add.assert_called_once_with(1) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_processor.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_processor.py new file mode 100644 index 000000000000..f3948b31e593 --- /dev/null +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_processor.py @@ -0,0 +1,44 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +import unittest +from unittest import mock + +from azure.monitor.opentelemetry.exporter._quickpulse._processor import ( + _QuickpulseLogRecordProcessor, + _QuickpulseSpanProcessor, +) +from azure.monitor.opentelemetry.exporter._quickpulse._live_metrics import _QuickpulseManager + + +class TestQuickpulseLogRecordProcessor(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.qpm = mock.Mock() + _QuickpulseManager._instance = cls.qpm + + @classmethod + def tearDownClass(cls) -> None: + _QuickpulseManager._instance = None + + def test_emit(self): + processor = _QuickpulseLogRecordProcessor() + log_data = mock.Mock() + processor.emit(log_data) + self.qpm._record_log_record.assert_called_once_with(log_data) + + +class TestQuickpulseSpanProcessor(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.qpm = mock.Mock() + _QuickpulseManager._instance = cls.qpm + + @classmethod + def tearDownClass(cls) -> None: + _QuickpulseManager._instance = None + + def test_on_end(self): + processor = _QuickpulseSpanProcessor() + span = mock.Mock() + processor.on_end(span) + self.qpm._record_span.assert_called_once_with(span) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_utils.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_utils.py new file mode 100644 index 000000000000..ea03e61f5a7d --- /dev/null +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_utils.py @@ -0,0 +1,204 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +from datetime import datetime +import unittest +from unittest import mock + +from opentelemetry.sdk.metrics.export import HistogramDataPoint, NumberDataPoint +from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.trace import SpanKind + +from azure.monitor.opentelemetry.exporter._quickpulse._constants import ( + _COMMITTED_BYTES_NAME, + _DocumentIngressDocumentType, +) +from azure.monitor.opentelemetry.exporter._quickpulse._generated.models._models import ( + Exception, + MetricPoint, + MonitoringDataPoint, + RemoteDependency, + Request, + Trace, +) +from azure.monitor.opentelemetry.exporter._quickpulse._utils import ( + _get_span_document, + _get_log_record_document, + _metric_to_quick_pulse_data_points, +) + + +class TestUtils(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.base_mdp = MonitoringDataPoint( + version=1.0, + instance="test_instance", + role_name="test_role_name", + machine_name="test_machine_name", + stream_id="test_stream_id" + ) + + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._utils.datetime") + def test_metric_to_qp_data_point_hist(self, datetime_mock): + point = HistogramDataPoint( + {}, + 0, + 0, + 2, + 10, + [1,1,0], + [0,10,20], + 0, + 5, + ) + metric = mock.Mock() + metric.name = _COMMITTED_BYTES_NAME[0] + metric.data.data_points = [point] + scope_metric = mock.Mock() + scope_metric.metrics = [metric] + resource_metric = mock.Mock() + resource_metric.scope_metrics = [scope_metric] + metric_data = mock.Mock() + metric_data.resource_metrics = [resource_metric] + metric_point = MetricPoint( + name=_COMMITTED_BYTES_NAME[1], + weight=1, + value=5 + ) + documents = [mock.Mock()] + date_now = datetime.now() + datetime_mock.now.return_value = date_now + mdp = _metric_to_quick_pulse_data_points(metric_data, self.base_mdp, documents)[0] + self.assertEqual(mdp.version, self.base_mdp.version) + self.assertEqual(mdp.instance, self.base_mdp.instance) + self.assertEqual(mdp.role_name, self.base_mdp.role_name) + self.assertEqual(mdp.machine_name, self.base_mdp.machine_name) + self.assertEqual(mdp.stream_id, self.base_mdp.stream_id) + self.assertEqual(mdp.timestamp, date_now) + self.assertEqual(mdp.metrics, [metric_point]) + self.assertEqual(mdp.documents, documents) + + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._utils.datetime") + def test_metric_to_qp_data_point_num(self, datetime_mock): + point = NumberDataPoint( + {}, + 0, + 0, + 7, + ) + metric = mock.Mock() + metric.name = _COMMITTED_BYTES_NAME[0] + metric.data.data_points = [point] + scope_metric = mock.Mock() + scope_metric.metrics = [metric] + resource_metric = mock.Mock() + resource_metric.scope_metrics = [scope_metric] + metric_data = mock.Mock() + metric_data.resource_metrics = [resource_metric] + metric_point = MetricPoint( + name=_COMMITTED_BYTES_NAME[1], + weight=1, + value=7 + ) + documents = [mock.Mock()] + date_now = datetime.now() + datetime_mock.now.return_value = date_now + mdp = _metric_to_quick_pulse_data_points(metric_data, self.base_mdp, documents)[0] + self.assertEqual(mdp.version, self.base_mdp.version) + self.assertEqual(mdp.instance, self.base_mdp.instance) + self.assertEqual(mdp.role_name, self.base_mdp.role_name) + self.assertEqual(mdp.machine_name, self.base_mdp.machine_name) + self.assertEqual(mdp.stream_id, self.base_mdp.stream_id) + self.assertEqual(mdp.timestamp, date_now) + self.assertEqual(mdp.metrics, [metric_point]) + self.assertEqual(mdp.documents, documents) + + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._utils._ns_to_iso8601_string") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._utils._get_url") + def test_get_span_document_client(self, url_mock, iso_mock): + span_mock = mock.Mock() + span_mock.name = "test_span" + span_mock.end_time = 10 + span_mock.start_time = 4 + span_mock.attributes = { + SpanAttributes.HTTP_STATUS_CODE: "200", + SpanAttributes.RPC_GRPC_STATUS_CODE: "400", + } + span_mock.kind = SpanKind.CLIENT + url_mock.return_value = "test_url" + iso_mock.return_value = "1000" + doc = _get_span_document(span_mock) + self.assertTrue(isinstance(doc, RemoteDependency)) + self.assertEqual(doc.document_type, _DocumentIngressDocumentType.RemoteDependency.value) + self.assertEqual(doc.name, "test_span") + self.assertEqual(doc.command_name, "test_url") + self.assertEqual(doc.result_code, "200") + self.assertEqual(doc.duration, "1000") + + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._utils._ns_to_iso8601_string") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._utils._get_url") + def test_get_span_document_server(self, url_mock, iso_mock): + span_mock = mock.Mock() + span_mock.name = "test_span" + span_mock.end_time = 10 + span_mock.start_time = 4 + span_mock.attributes = { + SpanAttributes.HTTP_STATUS_CODE: "200", + SpanAttributes.RPC_GRPC_STATUS_CODE: "400", + } + span_mock.kind = SpanKind.SERVER + url_mock.return_value = "test_url" + iso_mock.return_value = "1000" + doc = _get_span_document(span_mock) + self.assertTrue(isinstance(doc, Request)) + self.assertEqual(doc.document_type, _DocumentIngressDocumentType.Request.value) + self.assertEqual(doc.name, "test_span") + self.assertEqual(doc.url, "test_url") + self.assertEqual(doc.response_code, "200") + self.assertEqual(doc.duration, "1000") + + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._utils._ns_to_iso8601_string") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._utils._get_url") + def test_get_span_document_server_grpc_status(self, url_mock, iso_mock): + span_mock = mock.Mock() + span_mock.name = "test_span" + span_mock.end_time = 10 + span_mock.start_time = 4 + span_mock.attributes = { + SpanAttributes.RPC_GRPC_STATUS_CODE: "400", + } + span_mock.kind = SpanKind.SERVER + url_mock.return_value = "test_url" + iso_mock.return_value = "1000" + doc = _get_span_document(span_mock) + self.assertTrue(isinstance(doc, Request)) + self.assertEqual(doc.document_type, _DocumentIngressDocumentType.Request.value) + self.assertEqual(doc.name, "test_span") + self.assertEqual(doc.url, "test_url") + self.assertEqual(doc.response_code, "400") + self.assertEqual(doc.duration, "1000") + + def test_get_log_record_document_server_exc(self): + log_record = mock.Mock() + log_record.attributes = { + SpanAttributes.EXCEPTION_TYPE: "exc_type", + SpanAttributes.EXCEPTION_MESSAGE: "exc_message", + } + log_data = mock.Mock() + log_data.log_record = log_record + doc = _get_log_record_document(log_data) + self.assertTrue(isinstance(doc, Exception)) + self.assertEqual(doc.document_type, _DocumentIngressDocumentType.Exception.value) + self.assertEqual(doc.exception_type, "exc_type") + self.assertEqual(doc.exception_message, "exc_message") + + def test_get_log_record_document_server_exc(self): + log_record = mock.Mock() + log_record.attributes = {} + log_record.body = "body" + log_data = mock.Mock() + log_data.log_record = log_record + doc = _get_log_record_document(log_data) + self.assertTrue(isinstance(doc, Trace)) + self.assertEqual(doc.document_type, _DocumentIngressDocumentType.Trace.value) + self.assertEqual(doc.message, "body")