Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
pylint==3.3.4
httpretty==1.1.4
pyright==1.1.396
pyright==1.1.405
sphinx==7.1.2
sphinx-rtd-theme==2.0.0rc4
sphinx-autodoc-typehints==1.25.2
Expand Down
12 changes: 12 additions & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,18 @@
"py:class",
"opentelemetry.trace._LinkBase",
),
(
"py:class",
"opentelemetry.proto.collector.trace.v1.trace_service_pb2_grpc.TraceServiceStub",
),
(
"py:class",
"opentelemetry.proto.collector.metrics.v1.metrics_service_pb2_grpc.MetricsServiceStub",
),
(
"py:class",
"opentelemetry.proto.collector.logs.v1.logs_service_pb2_grpc.LogsServiceStub",
),
(
"py:class",
"opentelemetry.exporter.otlp.proto.grpc.exporter.OTLPExporterMixin",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# limitations under the License.

from os import environ
from typing import Dict, Optional, Sequence, Tuple, Union
from typing import Dict, Literal, Optional, Sequence, Tuple, Union
from typing import Sequence as TypingSequence

from grpc import ChannelCredentials, Compression
Expand All @@ -29,7 +29,6 @@
LogsServiceStub,
)
from opentelemetry.sdk._logs import LogData
from opentelemetry.sdk._logs import LogRecord as SDKLogRecord
from opentelemetry.sdk._logs.export import LogExporter, LogExportResult
from opentelemetry.sdk.environment_variables import (
_OTEL_PYTHON_EXPORTER_OTLP_GRPC_LOGS_CREDENTIAL_PROVIDER,
Expand All @@ -46,11 +45,13 @@

class OTLPLogExporter(
LogExporter,
OTLPExporterMixin[SDKLogRecord, ExportLogsServiceRequest, LogExportResult],
OTLPExporterMixin[
Sequence[LogData],
ExportLogsServiceRequest,
LogExportResult,
LogsServiceStub,
],
):
_result = LogExportResult
_stub = LogsServiceStub

def __init__(
self,
endpoint: Optional[str] = None,
Expand All @@ -61,12 +62,11 @@ def __init__(
] = None,
timeout: Optional[float] = None,
compression: Optional[Compression] = None,
channel_options: Optional[TypingSequence[Tuple[str, str]]] = None,
channel_options: Optional[Tuple[Tuple[str, str]]] = None,
):
if insecure is None:
insecure = environ.get(OTEL_EXPORTER_OTLP_LOGS_INSECURE)
if insecure is not None:
insecure = insecure.lower() == "true"
insecure_logs = environ.get(OTEL_EXPORTER_OTLP_LOGS_INSECURE)
if insecure is None and insecure_logs is not None:
insecure = insecure_logs.lower() == "true"

if (
not insecure
Expand All @@ -90,29 +90,30 @@ def __init__(
if compression is None
else compression
)
endpoint = endpoint or environ.get(OTEL_EXPORTER_OTLP_LOGS_ENDPOINT)

headers = headers or environ.get(OTEL_EXPORTER_OTLP_LOGS_HEADERS)

super().__init__(
**{
"endpoint": endpoint,
"insecure": insecure,
"credentials": credentials,
"headers": headers,
"timeout": timeout or environ_timeout,
"compression": compression,
"channel_options": channel_options,
}
OTLPExporterMixin.__init__(
self,
endpoint=endpoint or environ.get(OTEL_EXPORTER_OTLP_LOGS_ENDPOINT),
insecure=insecure,
credentials=credentials,
headers=headers or environ.get(OTEL_EXPORTER_OTLP_LOGS_HEADERS),
timeout=timeout or environ_timeout,
compression=compression,
stub=LogsServiceStub,
result=LogExportResult,
channel_options=channel_options,
)

def _translate_data(
self, data: Sequence[LogData]
) -> ExportLogsServiceRequest:
return encode_logs(data)

def export(self, batch: Sequence[LogData]) -> LogExportResult:
return self._export(batch)
def export( # type: ignore [reportIncompatibleMethodOverride]
self,
batch: Sequence[LogData],
) -> Literal[LogExportResult.SUCCESS, LogExportResult.FAILURE]:
return OTLPExporterMixin._export(self, batch)

def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
OTLPExporterMixin.shutdown(self, timeout_millis=timeout_millis)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@
Dict,
Generic,
List,
Literal,
NewType,
Optional,
Tuple,
Type,
TypeVar,
Union,
)
Expand All @@ -53,12 +56,32 @@
from opentelemetry.exporter.otlp.proto.grpc import (
_OTLP_GRPC_CHANNEL_OPTIONS,
)
from opentelemetry.proto.collector.logs.v1.logs_service_pb2 import (
ExportLogsServiceRequest,
)
from opentelemetry.proto.collector.logs.v1.logs_service_pb2_grpc import (
LogsServiceStub,
)
from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2 import (
ExportMetricsServiceRequest,
)
from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2_grpc import (
MetricsServiceStub,
)
from opentelemetry.proto.collector.trace.v1.trace_service_pb2 import (
ExportTraceServiceRequest,
)
from opentelemetry.proto.collector.trace.v1.trace_service_pb2_grpc import (
TraceServiceStub,
)
from opentelemetry.proto.common.v1.common_pb2 import ( # noqa: F401
AnyValue,
ArrayValue,
KeyValue,
)
from opentelemetry.proto.resource.v1.resource_pb2 import Resource # noqa: F401
from opentelemetry.sdk._logs import LogData
from opentelemetry.sdk._logs.export import LogExportResult
from opentelemetry.sdk._shared_internal import DuplicateFilter
from opentelemetry.sdk.environment_variables import (
_OTEL_PYTHON_EXPORTER_OTLP_GRPC_CREDENTIAL_PROVIDER,
Expand All @@ -71,9 +94,10 @@
OTEL_EXPORTER_OTLP_INSECURE,
OTEL_EXPORTER_OTLP_TIMEOUT,
)
from opentelemetry.sdk.metrics.export import MetricsData
from opentelemetry.sdk.metrics.export import MetricExportResult, MetricsData
from opentelemetry.sdk.resources import Resource as SDKResource
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk.trace.export import SpanExportResult
from opentelemetry.util._importlib_metadata import entry_points
from opentelemetry.util.re import parse_env_headers

Expand All @@ -92,11 +116,29 @@
logger = getLogger(__name__)
# This prevents logs generated when a log fails to be written to generate another log which fails to be written etc. etc.
logger.addFilter(DuplicateFilter())
SDKDataT = TypeVar("SDKDataT")
SDKDataT = TypeVar(
"SDKDataT",
TypingSequence[LogData],
MetricsData,
TypingSequence[ReadableSpan],
)
ResourceDataT = TypeVar("ResourceDataT")
TypingResourceT = TypeVar("TypingResourceT")
ExportServiceRequestT = TypeVar("ExportServiceRequestT")
ExportResultT = TypeVar("ExportResultT")
ExportServiceRequestT = TypeVar(
"ExportServiceRequestT",
ExportTraceServiceRequest,
ExportMetricsServiceRequest,
ExportLogsServiceRequest,
)
ExportResultT = TypeVar(
"ExportResultT",
LogExportResult,
MetricExportResult,
SpanExportResult,
)
ExportStubT = TypeVar(
"ExportStubT", TraceServiceStub, MetricsServiceStub, LogsServiceStub
)

_ENVIRON_TO_COMPRESSION = {
None: None,
Expand All @@ -119,7 +161,10 @@ def environ_to_compression(environ_key: str) -> Optional[Compression]:
if environ_key in environ
else None
)
if environ_value not in _ENVIRON_TO_COMPRESSION:
if (
environ_value not in _ENVIRON_TO_COMPRESSION
and environ_value is not None
):
raise InvalidCompressionValueException(environ_key, environ_value)
return _ENVIRON_TO_COMPRESSION[environ_value]

Expand Down Expand Up @@ -151,7 +196,7 @@ def _load_credentials(
certificate_file: Optional[str],
client_key_file: Optional[str],
client_certificate_file: Optional[str],
) -> Optional[ChannelCredentials]:
) -> ChannelCredentials:
root_certificates = (
_read_file(certificate_file) if certificate_file else None
)
Expand Down Expand Up @@ -214,7 +259,7 @@ def _get_credentials(

# pylint: disable=no-member
class OTLPExporterMixin(
ABC, Generic[SDKDataT, ExportServiceRequestT, ExportResultT]
ABC, Generic[SDKDataT, ExportServiceRequestT, ExportResultT, ExportStubT]
):
"""OTLP span exporter

Expand All @@ -230,6 +275,8 @@ class OTLPExporterMixin(

def __init__(
self,
stub: ExportStubT,
result: ExportResultT,
endpoint: Optional[str] = None,
insecure: Optional[bool] = None,
credentials: Optional[ChannelCredentials] = None,
Expand All @@ -238,10 +285,11 @@ def __init__(
] = None,
timeout: Optional[float] = None,
compression: Optional[Compression] = None,
channel_options: Optional[TypingSequence[Tuple[str, str]]] = None,
channel_options: Optional[Tuple[Tuple[str, str]]] = None,
):
super().__init__()

self._result = result
self._stub = stub
self._endpoint = endpoint or environ.get(
OTEL_EXPORTER_OTLP_ENDPOINT, "http://localhost:4317"
)
Expand All @@ -250,15 +298,12 @@ def __init__(

if parsed_url.scheme == "https":
insecure = False
insecure_exporter = environ.get(OTEL_EXPORTER_OTLP_INSECURE)
if insecure is None:
insecure = environ.get(OTEL_EXPORTER_OTLP_INSECURE)
if insecure is not None:
insecure = insecure.lower() == "true"
if insecure_exporter is not None:
insecure = insecure_exporter.lower() == "true"
else:
if parsed_url.scheme == "http":
insecure = True
else:
insecure = False
insecure = parsed_url.scheme == "http"

if parsed_url.netloc:
self._endpoint = parsed_url.netloc
Expand All @@ -277,12 +322,12 @@ def __init__(
overridden_options = {
opt_name for (opt_name, _) in channel_options
}
default_options = [
default_options = tuple(
(opt_name, opt_value)
for opt_name, opt_value in _OTLP_GRPC_CHANNEL_OPTIONS
if opt_name not in overridden_options
]
self._channel_options = tuple(default_options) + channel_options
)
self._channel_options = default_options + channel_options
else:
self._channel_options = tuple(_OTLP_GRPC_CHANNEL_OPTIONS)

Expand Down Expand Up @@ -317,24 +362,25 @@ def __init__(
compression=compression,
options=self._channel_options,
)
self._client = self._stub(self._channel)
self._client = self._stub(self._channel) # type: ignore [reportCallIssue]

self._shutdown_in_progress = threading.Event()
self._shutdown = False

@abstractmethod
def _translate_data(
self, data: TypingSequence[SDKDataT]
self,
data: SDKDataT,
) -> ExportServiceRequestT:
pass

def _export(
self,
data: Union[TypingSequence[ReadableSpan], MetricsData],
data: SDKDataT,
) -> ExportResultT:
if self._shutdown:
logger.warning("Exporter already shutdown, ignoring batch")
return self._result.FAILURE
return self._result.FAILURE # type: ignore [reportReturnType]

# FIXME remove this check if the export type for traces
# gets updated to a class that represents the proto
Expand All @@ -347,10 +393,10 @@ def _export(
metadata=self._headers,
timeout=deadline_sec - time(),
)
return self._result.SUCCESS
return self._result.SUCCESS # type: ignore [reportReturnType]
except RpcError as error:
retry_info_bin = dict(error.trailing_metadata()).get(
"google.rpc.retryinfo-bin"
retry_info_bin = dict(error.trailing_metadata()).get( # type: ignore [reportAttributeAccessIssue]
"google.rpc.retryinfo-bin" # type: ignore [reportArgumentType]
)
# multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff.
backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2)
Expand All @@ -362,7 +408,7 @@ def _export(
+ retry_info.retry_delay.nanos / 1.0e9
)
if (
error.code() not in _RETRYABLE_ERROR_CODES
error.code() not in _RETRYABLE_ERROR_CODES # type: ignore [reportAttributeAccessIssue]
or retry_num + 1 == _MAX_RETRYS
or backoff_seconds > (deadline_sec - time())
or self._shutdown
Expand All @@ -371,13 +417,13 @@ def _export(
"Failed to export %s to %s, error code: %s",
self._exporting,
self._endpoint,
error.code(),
exc_info=error.code() == StatusCode.UNKNOWN,
error.code(), # type: ignore [reportAttributeAccessIssue]
exc_info=error.code() == StatusCode.UNKNOWN, # type: ignore [reportAttributeAccessIssue]
)
return self._result.FAILURE
return self._result.FAILURE # type: ignore [reportReturnType]
logger.warning(
"Transient error %s encountered while exporting %s to %s, retrying in %.2fs.",
error.code(),
error.code(), # type: ignore [reportAttributeAccessIssue]
self._exporting,
self._endpoint,
backoff_seconds,
Expand All @@ -387,7 +433,7 @@ def _export(
logger.warning("Shutdown in progress, aborting retry.")
break
# Not possible to reach here but the linter is complaining.
return self._result.FAILURE
return self._result.FAILURE # type: ignore [reportReturnType]

def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
if self._shutdown:
Expand Down
Loading