Skip to content

Commit 326e055

Browse files
committed
Allow submitting Vec<Vec<Span>> asynchronously
1 parent af93f6f commit 326e055

File tree

1 file changed

+27
-12
lines changed
  • libdd-data-pipeline/src/trace_exporter

1 file changed

+27
-12
lines changed

libdd-data-pipeline/src/trace_exporter/mod.rs

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -592,7 +592,24 @@ impl TraceExporter {
592592
trace_chunks: Vec<Vec<Span<T>>>,
593593
) -> Result<AgentResponse, TraceExporterError> {
594594
self.check_agent_info();
595-
self.send_trace_chunks_inner(trace_chunks)
595+
self.runtime()?
596+
.block_on(async { self.send_trace_chunks_inner(trace_chunks).await })
597+
}
598+
599+
/// Send a list of trace chunks to the agent, asynchronously
600+
///
601+
/// # Arguments
602+
/// * trace_chunks: A list of trace chunks. Each trace chunk is a list of spans.
603+
///
604+
/// # Returns
605+
/// * Ok(String): The response from the agent
606+
/// * Err(TraceExporterError): An error detailing what went wrong in the process
607+
pub async fn send_trace_chunks_async<T: SpanText>(
608+
&self,
609+
trace_chunks: Vec<Vec<Span<T>>>,
610+
) -> Result<AgentResponse, TraceExporterError> {
611+
self.check_agent_info();
612+
self.send_trace_chunks_inner(trace_chunks).await
596613
}
597614

598615
/// Deserializes, processes and sends trace chunks to the agent
@@ -622,7 +639,8 @@ impl TraceExporter {
622639
None,
623640
);
624641

625-
self.send_trace_chunks_inner(traces)
642+
self.runtime()?
643+
.block_on(async { self.send_trace_chunks_inner(traces).await })
626644
}
627645

628646
/// Send traces payload to agent with retry and telemetry reporting
@@ -669,7 +687,7 @@ impl TraceExporter {
669687
self.handle_send_result(result, chunks, payload_len).await
670688
}
671689

672-
fn send_trace_chunks_inner<T: SpanText>(
690+
async fn send_trace_chunks_inner<T: SpanText>(
673691
&self,
674692
mut traces: Vec<Vec<Span<T>>>,
675693
) -> Result<AgentResponse, TraceExporterError> {
@@ -694,15 +712,12 @@ impl TraceExporter {
694712
..self.endpoint.clone()
695713
};
696714

697-
self.runtime()?.block_on(async {
698-
self.send_traces_with_telemetry(
699-
&endpoint,
700-
prepared.data,
701-
prepared.headers,
702-
prepared.chunk_count,
703-
)
704-
.await
705-
})
715+
self.send_traces_with_telemetry(
716+
&endpoint,
717+
prepared.data,
718+
prepared.headers,
719+
prepared.chunk_count,
720+
).await
706721
}
707722

708723
/// Handle the result of sending traces to the agent

0 commit comments

Comments
 (0)