Skip to content

Commit 3fe647d

Browse files
authored
Merge branch 'main' into propogate-trace-errors
2 parents 83c695a + c54d2b8 commit 3fe647d

File tree

25 files changed

+480
-51
lines changed

25 files changed

+480
-51
lines changed

examples/tracing-http-propagator/src/server.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,10 @@ impl LogProcessor for EnrichWithBaggageLogProcessor {
121121
fn force_flush(&self) -> OTelSdkResult {
122122
Ok(())
123123
}
124+
125+
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
126+
Ok(())
127+
}
124128
}
125129

126130
/// A custom span processor that enriches spans with baggage attributes. Baggage

opentelemetry-appender-log/examples/logs-basic.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
//! run with `$ cargo run --example logs-basic`
22
3-
/// This example shows how to use in_memory_exporter for logs. This uses opentelemetry-appender-log crate, which is a
4-
/// [logging appender](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/glossary.md#log-appender--bridge) that bridges logs from the [log crate](https://docs.rs/log/latest/log/) to OpenTelemetry.
5-
/// The example setups a LoggerProvider with a in-memory exporter, so emitted logs are stored in memory.
3+
/// This example shows how to use stdout exporter for logs. This uses
4+
/// opentelemetry-appender-log crate, which is a [logging
5+
/// appender](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/glossary.md#log-appender--bridge)
6+
/// that bridges logs from the [log crate](https://docs.rs/log/latest/log/) to
7+
/// OpenTelemetry. The example setups a LoggerProvider with a stdout exporter,
8+
/// so emitted logs are written to stdout.
69
///
710
use log::{error, info, warn, Level};
811
use opentelemetry_appender_log::OpenTelemetryLogBridge;
9-
use opentelemetry_sdk::logs::{BatchLogProcessor, SdkLoggerProvider};
12+
use opentelemetry_sdk::{logs::SdkLoggerProvider, Resource};
1013
use opentelemetry_stdout::LogExporter;
1114

1215
#[tokio::main]
@@ -15,7 +18,12 @@ async fn main() {
1518
let exporter = LogExporter::default();
1619
//Create a LoggerProvider and register the exporter
1720
let logger_provider = SdkLoggerProvider::builder()
18-
.with_log_processor(BatchLogProcessor::builder(exporter).build())
21+
.with_resource(
22+
Resource::builder()
23+
.with_service_name("log-appender-log-example")
24+
.build(),
25+
)
26+
.with_simple_exporter(exporter)
1927
.build();
2028

2129
// Setup Log Appender for the log crate.

opentelemetry-appender-tracing/benches/log-attributes.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,10 @@ impl LogProcessor for NoopProcessor {
4343
fn force_flush(&self) -> OTelSdkResult {
4444
Ok(())
4545
}
46+
47+
fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult {
48+
Ok(())
49+
}
4650
}
4751

4852
/// Creates a single benchmark for a specific number of attributes

opentelemetry-appender-tracing/benches/logs.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@ impl LogProcessor for NoopProcessor {
6262
) -> bool {
6363
self.enabled
6464
}
65+
66+
fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult {
67+
Ok(())
68+
}
6569
}
6670

6771
struct NoOpLogLayer {

opentelemetry-appender-tracing/src/layer.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -934,6 +934,10 @@ mod tests {
934934
fn force_flush(&self) -> OTelSdkResult {
935935
Ok(())
936936
}
937+
938+
fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult {
939+
Ok(())
940+
}
937941
}
938942

939943
#[cfg(feature = "spec_unstable_logs_enabled")]

opentelemetry-otlp/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
## vNext
44

5+
- Add partial success response handling for OTLP exporters (traces, metrics, logs) per OTLP spec. Exporters now log warnings when the server returns partial success responses with rejected items and error messages. [#865](https://github.com/open-telemetry/opentelemetry-rust/issues/865)
56
- Refactor `internal-logs` feature in `opentelemetry-otlp` to reduce unnecessary dependencies[3191](https://github.com/open-telemetry/opentelemetry-rust/pull/3192)
67

78
## 0.31.0

opentelemetry-otlp/src/exporter/http/logs.rs

Lines changed: 78 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,23 @@
11
use super::OtlpHttpClient;
2+
use crate::Protocol;
3+
use opentelemetry::{otel_debug, otel_warn};
24
use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
35
use opentelemetry_sdk::logs::{LogBatch, LogExporter};
6+
use prost::Message;
47
use std::time;
58

69
impl LogExporter for OtlpHttpClient {
710
async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult {
8-
self.export_http_with_retry(
9-
batch,
10-
OtlpHttpClient::build_logs_export_body,
11-
"HttpLogsClient.Export",
12-
)
13-
.await
11+
let response_body = self
12+
.export_http_with_retry(
13+
batch,
14+
OtlpHttpClient::build_logs_export_body,
15+
"HttpLogsClient.Export",
16+
)
17+
.await?;
18+
19+
handle_partial_success(&response_body, self.protocol);
20+
Ok(())
1421
}
1522

1623
fn shutdown_with_timeout(&self, _timeout: time::Duration) -> OTelSdkResult {
@@ -29,3 +36,68 @@ impl LogExporter for OtlpHttpClient {
2936
self.resource = resource.into();
3037
}
3138
}
39+
40+
/// Handles partial success returned by OTLP endpoints. We log the rejected log records,
41+
/// as well as the error message returned.
42+
fn handle_partial_success(response_body: &[u8], protocol: Protocol) {
43+
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceResponse;
44+
45+
let response: ExportLogsServiceResponse = match protocol {
46+
#[cfg(feature = "http-json")]
47+
Protocol::HttpJson => match serde_json::from_slice(response_body) {
48+
Ok(r) => r,
49+
Err(e) => {
50+
otel_debug!(name: "HttpLogsClient.ResponseParseError", error = e.to_string());
51+
return;
52+
}
53+
},
54+
_ => match Message::decode(response_body) {
55+
Ok(r) => r,
56+
Err(e) => {
57+
otel_debug!(name: "HttpLogsClient.ResponseParseError", error = e.to_string());
58+
return;
59+
}
60+
},
61+
};
62+
63+
if let Some(partial_success) = response.partial_success {
64+
if partial_success.rejected_log_records > 0 || !partial_success.error_message.is_empty() {
65+
otel_warn!(
66+
name: "HttpLogsClient.PartialSuccess",
67+
rejected_log_records = partial_success.rejected_log_records,
68+
error_message = partial_success.error_message.as_str(),
69+
);
70+
}
71+
}
72+
}
73+
74+
#[cfg(test)]
75+
mod tests {
76+
use super::*;
77+
78+
#[test]
79+
fn test_handle_invalid_protobuf() {
80+
// Corrupted/invalid protobuf data
81+
let invalid = vec![0xFF, 0xFF, 0xFF, 0xFF];
82+
83+
// Should not panic - logs debug and returns early
84+
handle_partial_success(&invalid, Protocol::HttpBinary);
85+
}
86+
87+
#[test]
88+
fn test_handle_empty_response() {
89+
let empty = vec![];
90+
91+
// Should not panic
92+
handle_partial_success(&empty, Protocol::HttpBinary);
93+
}
94+
95+
#[cfg(feature = "http-json")]
96+
#[test]
97+
fn test_handle_invalid_json() {
98+
let invalid_json = b"{not valid json}";
99+
100+
// Should not panic - logs debug and returns
101+
handle_partial_success(invalid_json, Protocol::HttpJson);
102+
}
103+
}

opentelemetry-otlp/src/exporter/http/metrics.rs

Lines changed: 74 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
use crate::metric::MetricsClient;
2+
use crate::Protocol;
3+
use opentelemetry::{otel_debug, otel_warn};
24
use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
35
use opentelemetry_sdk::metrics::data::ResourceMetrics;
6+
use prost::Message;
47

58
use super::OtlpHttpClient;
69

@@ -12,8 +15,12 @@ impl MetricsClient for OtlpHttpClient {
1215
.ok_or_else(|| "Failed to serialize metrics".to_string())
1316
};
1417

15-
self.export_http_with_retry(metrics, build_body_wrapper, "HttpMetricsClient.Export")
16-
.await
18+
let response_body = self
19+
.export_http_with_retry(metrics, build_body_wrapper, "HttpMetricsClient.Export")
20+
.await?;
21+
22+
handle_partial_success(&response_body, self.protocol);
23+
Ok(())
1724
}
1825

1926
fn shutdown(&self) -> OTelSdkResult {
@@ -25,3 +32,68 @@ impl MetricsClient for OtlpHttpClient {
2532
Ok(())
2633
}
2734
}
35+
36+
/// Handles partial success returned by OTLP endpoints. We log the rejected data points,
37+
/// as well as the error message returned.
38+
fn handle_partial_success(response_body: &[u8], protocol: Protocol) {
39+
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceResponse;
40+
41+
let response: ExportMetricsServiceResponse = match protocol {
42+
#[cfg(feature = "http-json")]
43+
Protocol::HttpJson => match serde_json::from_slice(response_body) {
44+
Ok(r) => r,
45+
Err(e) => {
46+
otel_debug!(name: "HttpMetricsClient.ResponseParseError", error = e.to_string());
47+
return;
48+
}
49+
},
50+
_ => match Message::decode(response_body) {
51+
Ok(r) => r,
52+
Err(e) => {
53+
otel_debug!(name: "HttpMetricsClient.ResponseParseError", error = e.to_string());
54+
return;
55+
}
56+
},
57+
};
58+
59+
if let Some(partial_success) = response.partial_success {
60+
if partial_success.rejected_data_points > 0 || !partial_success.error_message.is_empty() {
61+
otel_warn!(
62+
name: "HttpMetricsClient.PartialSuccess",
63+
rejected_data_points = partial_success.rejected_data_points,
64+
error_message = partial_success.error_message.as_str(),
65+
);
66+
}
67+
}
68+
}
69+
70+
#[cfg(test)]
71+
mod tests {
72+
use super::*;
73+
74+
#[test]
75+
fn test_handle_invalid_protobuf() {
76+
// Corrupted/invalid protobuf data
77+
let invalid = vec![0xFF, 0xFF, 0xFF, 0xFF];
78+
79+
// Should not panic - logs debug and returns early
80+
handle_partial_success(&invalid, Protocol::HttpBinary);
81+
}
82+
83+
#[test]
84+
fn test_handle_empty_response() {
85+
let empty = vec![];
86+
87+
// Should not panic
88+
handle_partial_success(&empty, Protocol::HttpBinary);
89+
}
90+
91+
#[cfg(feature = "http-json")]
92+
#[test]
93+
fn test_handle_invalid_json() {
94+
let invalid_json = b"{not valid json}";
95+
96+
// Should not panic - logs debug and returns
97+
handle_partial_success(invalid_json, Protocol::HttpJson);
98+
}
99+
}

opentelemetry-otlp/src/exporter/http/mod.rs

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use super::{
55
use crate::{ExportConfig, Protocol, OTEL_EXPORTER_OTLP_ENDPOINT, OTEL_EXPORTER_OTLP_HEADERS};
66
use http::{HeaderName, HeaderValue, Uri};
77
use opentelemetry::otel_debug;
8-
use opentelemetry_http::HttpClient;
8+
use opentelemetry_http::{Bytes, HttpClient};
99
use opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema;
1010
#[cfg(feature = "logs")]
1111
use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope;
@@ -381,7 +381,7 @@ impl OtlpHttpClient {
381381
data: T,
382382
build_body_fn: F,
383383
operation_name: &'static str,
384-
) -> opentelemetry_sdk::error::OTelSdkResult
384+
) -> Result<Bytes, opentelemetry_sdk::error::OTelSdkError>
385385
where
386386
F: Fn(&Self, T) -> Result<(Vec<u8>, &'static str, Option<&'static str>), String>,
387387
{
@@ -408,7 +408,7 @@ impl OtlpHttpClient {
408408
#[cfg(not(feature = "reqwest-blocking-client"))]
409409
let runtime = opentelemetry_sdk::runtime::Tokio;
410410

411-
retry_with_backoff(
411+
let response_body = retry_with_backoff(
412412
runtime,
413413
self.retry_policy.clone(),
414414
classify_http_export_error,
@@ -424,7 +424,9 @@ impl OtlpHttpClient {
424424
},
425425
)
426426
.await
427-
.map_err(|e| opentelemetry_sdk::error::OTelSdkError::InternalFailure(e.message))
427+
.map_err(|e| opentelemetry_sdk::error::OTelSdkError::InternalFailure(e.message))?;
428+
429+
Ok(response_body)
428430
}
429431

430432
#[cfg(not(feature = "experimental-http-retry"))]
@@ -438,9 +440,12 @@ impl OtlpHttpClient {
438440
endpoint: self.collector_endpoint.to_string(),
439441
};
440442

441-
self.export_http_once(&retry_data, content_type, content_encoding, operation_name)
443+
let response_body = self
444+
.export_http_once(&retry_data, content_type, content_encoding, operation_name)
442445
.await
443-
.map_err(|e| opentelemetry_sdk::error::OTelSdkError::InternalFailure(e.message))
446+
.map_err(|e| opentelemetry_sdk::error::OTelSdkError::InternalFailure(e.message))?;
447+
448+
Ok(response_body)
444449
}
445450
}
446451

@@ -451,7 +456,7 @@ impl OtlpHttpClient {
451456
content_type: &'static str,
452457
content_encoding: Option<&'static str>,
453458
_operation_name: &'static str,
454-
) -> Result<(), HttpExportError> {
459+
) -> Result<Bytes, HttpExportError> {
455460
// Get client
456461
let client = self
457462
.client
@@ -510,7 +515,9 @@ impl OtlpHttpClient {
510515
}
511516

512517
otel_debug!(name: "HttpClient.ExportSucceeded");
513-
Ok(())
518+
519+
// Return the response, consuming the body to save a copy
520+
Ok(response.into_body())
514521
}
515522

516523
/// Compress data using gzip or zstd if the user has requested it and the relevant feature

0 commit comments

Comments
 (0)