Skip to content

Commit

Permalink
Init CrashTracker support in the sidecar (#535)
Browse files Browse the repository at this point in the history
* Fix typos + add a missing func in profiling-ffi

* Init CrashTracker support in the sidecar

* Fix CI

* Create one socket per spawned sidecar

* cargo fmt

* Fix typo

* Fix rebase

* Fix clippy

* Remove unnecessary strip_prefix("file")

* Fix rebase

* Restore profiling-ffi/Cargo.toml as change is not required anymore

* Cleanup
  • Loading branch information
iamluc authored Aug 19, 2024
1 parent 31b6854 commit df81898
Show file tree
Hide file tree
Showing 16 changed files with 159 additions and 47 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions bin_tests/src/bin/crashtracker_bin_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ mod unix {
let mode = args.next().context("Unexpected number of arguments")?;
let output_url = args.next().context("Unexpected number of arguments")?;
let receiver_binary = args.next().context("Unexpected number of arguments")?;
let unix_socket_reciever_binary = args.next().context("Unexpected number of arguments")?;
let unix_socket_receiver_binary = args.next().context("Unexpected number of arguments")?;
let stderr_filename = args.next().context("Unexpected number of arguments")?;
let stdout_filename = args.next().context("Unexpected number of arguments")?;
let socket_path = args.next().context("Unexpected number of arguments")?;
Expand Down Expand Up @@ -83,7 +83,7 @@ mod unix {
// For now, this exits when a single message is received.
// When the listener is updated, we'll need to keep the handle and kill the receiver
// to avoid leaking a process.
std::process::Command::new(unix_socket_reciever_binary)
std::process::Command::new(unix_socket_receiver_binary)
.stderr(File::create(stderr_filename)?)
.stdout(File::create(stdout_filename)?)
.arg(&socket_path)
Expand Down
2 changes: 1 addition & 1 deletion bin_tests/src/bin/crashtracker_unix_socket_receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ fn main() -> anyhow::Result<()> {
args.len() == 2,
"Usage: crashtracker_unix_socket_receiver path_to_unix_socket"
);
datadog_crashtracker::reciever_entry_point_unix_socket(&args[1])
datadog_crashtracker::receiver_entry_point_unix_socket(&args[1])
}
28 changes: 28 additions & 0 deletions crashtracker-ffi/src/collector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::Result;
use anyhow::Context;
pub use counters::*;
pub use datatypes::*;
use ddcommon_ffi::{slice::AsBytes, CharSlice};
pub use spans::*;

#[no_mangle]
Expand Down Expand Up @@ -95,3 +96,30 @@ pub unsafe extern "C" fn ddog_crasht_init_with_receiver(
.context("ddog_crasht_init_with_receiver failed")
.into()
}

#[no_mangle]
#[must_use]
/// Initialize the crash-tracking infrastructure, writing to an unix socket in case of crash.
///
/// # Preconditions
/// None.
/// # Safety
/// Crash-tracking functions are not reentrant.
/// No other crash-handler functions should be called concurrently.
/// # Atomicity
/// This function is not atomic. A crash during its execution may lead to
/// unexpected crash-handling behaviour.
pub unsafe extern "C" fn ddog_crasht_init_with_unix_socket(
config: Config,
socket_path: CharSlice,
metadata: Metadata,
) -> Result {
(|| {
let config = config.try_into()?;
let socket_path = socket_path.try_to_utf8()?;
let metadata = metadata.try_into()?;
datadog_crashtracker::init_with_unix_socket(config, socket_path, metadata)
})()
.context("ddog_crasht_init_with_unix_socket failed")
.into()
}
2 changes: 1 addition & 1 deletion crashtracker-ffi/src/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub unsafe extern "C" fn ddog_crasht_receiver_entry_point_unix_socket(
) -> Result {
(|| {
let socket_path = socket_path.try_to_utf8()?;
datadog_crashtracker::reciever_entry_point_unix_socket(socket_path)
datadog_crashtracker::receiver_entry_point_unix_socket(socket_path)
})()
.context("ddog_crasht_receiver_entry_point_unix_socket failed")
.into()
Expand Down
2 changes: 1 addition & 1 deletion crashtracker/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
cmake_minimum_required(VERSION 3.19)
project(datadog_profiling_crashtracking_reciever LANGUAGES C CXX)
project(datadog_profiling_crashtracking_receiver LANGUAGES C CXX)

find_package(Datadog REQUIRED)

Expand Down
2 changes: 1 addition & 1 deletion crashtracker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ serde = {version = "1.0", features = ["derive"]}
serde_json = {version = "1.0"}
uuid = { version = "1.4.1", features = ["v4", "serde"] }
ddtelemetry = {path = "../ddtelemetry"}
tokio = { version = "1.23", features = ["rt", "macros"] }
tokio = { version = "1.23", features = ["rt", "macros", "io-std", "io-util"] }
http = "0.2"
portable-atomic = { version = "1.6.0", features = ["serde"] }
rand = "0.8.5"
Expand Down
20 changes: 15 additions & 5 deletions crashtracker/src/crash_info/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,6 @@ impl CrashInfo {

impl CrashInfo {
/// Emit the CrashInfo as structured json in file `path`.
/// SIGNAL SAFETY:
/// I believe but have not verified this is signal safe.
pub fn to_file(&self, path: &Path) -> anyhow::Result<()> {
let file =
File::create(path).with_context(|| format!("Failed to create {}", path.display()))?;
Expand All @@ -249,6 +247,17 @@ impl CrashInfo {
}

pub fn upload_to_endpoint(&self, endpoint: &Option<Endpoint>) -> anyhow::Result<()> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;

rt.block_on(async { self.async_upload_to_endpoint(endpoint).await })
}

pub async fn async_upload_to_endpoint(
&self,
endpoint: &Option<Endpoint>,
) -> anyhow::Result<()> {
// If we're debugging to a file, dump the actual crashinfo into a json
if let Some(endpoint) = endpoint {
if Some("file") == endpoint.url.scheme_str() {
Expand All @@ -257,13 +266,14 @@ impl CrashInfo {
self.to_file(&path)?;
}
}
self.upload_to_telemetry(endpoint)

self.upload_to_telemetry(endpoint).await
}

fn upload_to_telemetry(&self, endpoint: &Option<Endpoint>) -> anyhow::Result<()> {
async fn upload_to_telemetry(&self, endpoint: &Option<Endpoint>) -> anyhow::Result<()> {
if let Some(metadata) = &self.metadata {
if let Ok(uploader) = TelemetryCrashUploader::new(metadata, endpoint) {
uploader.upload_to_telemetry(self)?;
uploader.upload_to_telemetry(self).await?
}
}
Ok(())
Expand Down
37 changes: 17 additions & 20 deletions crashtracker/src/crash_info/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ struct TelemetryCrashInfoMessage<'a> {
}

pub struct TelemetryCrashUploader {
rt: tokio::runtime::Runtime,
metadata: TelemetryMetadata,
cfg: ddtelemetry::config::Config,
}
Expand Down Expand Up @@ -109,9 +108,6 @@ impl TelemetryCrashUploader {
let host = build_host();

let s = Self {
rt: tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?,
metadata: TelemetryMetadata {
host,
application,
Expand All @@ -122,7 +118,7 @@ impl TelemetryCrashUploader {
Ok(s)
}

pub fn upload_to_telemetry(&self, crash_info: &CrashInfo) -> anyhow::Result<()> {
pub async fn upload_to_telemetry(&self, crash_info: &CrashInfo) -> anyhow::Result<()> {
let metadata = &self.metadata;

let message = serde_json::to_string(&TelemetryCrashInfoMessage {
Expand Down Expand Up @@ -170,19 +166,19 @@ impl TelemetryCrashUploader {
ddcommon::header::APPLICATION_JSON,
)
.body(serde_json::to_string(&payload)?.into())?;
self.rt.block_on(async {
tokio::time::timeout(
std::time::Duration::from_millis({
if let Some(endp) = self.cfg.endpoint.as_ref() {
endp.timeout_ms
} else {
Endpoint::DEFAULT_TIMEOUT
}
}),
client.request(req),
)
.await
})??;

tokio::time::timeout(
std::time::Duration::from_millis({
if let Some(endp) = self.cfg.endpoint.as_ref() {
endp.timeout_ms
} else {
Endpoint::DEFAULT_TIMEOUT
}
}),
client.request(req),
)
.await??;

Ok(())
}
}
Expand Down Expand Up @@ -254,9 +250,9 @@ mod tests {
);
}

#[test]
#[tokio::test]
#[cfg_attr(miri, ignore)]
fn test_crash_request_content() {
async fn test_crash_request_content() {
let tmp = tempfile::tempdir().unwrap();
let output_filename = {
let mut p = tmp.into_path();
Expand Down Expand Up @@ -291,6 +287,7 @@ mod tests {
uuid: uuid::uuid!("1d6b97cb-968c-40c9-af6e-e4b4d71e8781"),
incomplete: true,
})
.await
.unwrap();

let payload: serde_json::value::Value =
Expand Down
5 changes: 4 additions & 1 deletion crashtracker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ pub use collector::{
pub use crash_info::*;

#[cfg(all(unix, feature = "receiver"))]
pub use receiver::{receiver_entry_point_stdin, reciever_entry_point_unix_socket};
pub use receiver::{
async_receiver_entry_point_unix_socket, receiver_entry_point_stdin,
receiver_entry_point_unix_socket,
};

#[cfg(all(unix, any(feature = "collector", feature = "receiver")))]
pub use shared::configuration::{
Expand Down
58 changes: 43 additions & 15 deletions crashtracker/src/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
use super::*;
use crate::shared::constants::*;
use anyhow::Context;
use std::{io::BufReader, os::unix::net::UnixListener};
use tokio::io::AsyncBufReadExt;
use tokio::io::BufReader;
use tokio::net::UnixListener;

pub fn resolve_frames(
config: &CrashtrackerConfiguration,
Expand Down Expand Up @@ -33,17 +35,38 @@ pub fn get_unix_socket(socket_path: impl AsRef<str>) -> anyhow::Result<UnixListe
Ok(unix_listener)
}

pub fn reciever_entry_point_unix_socket(socket_path: impl AsRef<str>) -> anyhow::Result<()> {
let listener = get_unix_socket(socket_path)?;
let (unix_stream, _) = listener.accept()?;
let stream = BufReader::new(unix_stream);
receiver_entry_point(stream)
pub fn receiver_entry_point_unix_socket(socket_path: impl AsRef<str>) -> anyhow::Result<()> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
rt.block_on(async_receiver_entry_point_unix_socket(socket_path, true))?;
Ok(())
// Dropping the stream closes it, allowing the collector to exit if it was waiting.
}

pub fn receiver_entry_point_stdin() -> anyhow::Result<()> {
let stream = std::io::stdin().lock();
receiver_entry_point(stream)
let stream = BufReader::new(tokio::io::stdin());
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
rt.block_on(receiver_entry_point(stream))?;
Ok(())
}

pub async fn async_receiver_entry_point_unix_socket(
socket_path: impl AsRef<str>,
one_shot: bool,
) -> anyhow::Result<()> {
let listener = get_unix_socket(socket_path)?;
loop {
let (unix_stream, _) = listener.accept().await?;
let stream = BufReader::new(unix_stream);
let res = receiver_entry_point(stream).await;

if one_shot {
return res;
}
}
}

/// Receives data from a crash collector via a pipe on `stdin`, formats it into
Expand All @@ -55,17 +78,19 @@ pub fn receiver_entry_point_stdin() -> anyhow::Result<()> {
///
/// See comments in [crashtracker/lib.rs] for a full architecture
/// description.
fn receiver_entry_point(stream: impl std::io::BufRead) -> anyhow::Result<()> {
match receive_report(stream)? {
async fn receiver_entry_point(
stream: impl AsyncBufReadExt + std::marker::Unpin,
) -> anyhow::Result<()> {
match receive_report(stream).await? {
CrashReportStatus::NoCrash => Ok(()),
CrashReportStatus::CrashReport(config, mut crash_info) => {
resolve_frames(&config, &mut crash_info)?;
crash_info.upload_to_endpoint(&config.endpoint)
crash_info.async_upload_to_endpoint(&config.endpoint).await
}
CrashReportStatus::PartialCrashReport(config, mut crash_info, stdin_state) => {
eprintln!("Failed to fully receive crash. Exit state was: {stdin_state:?}");
resolve_frames(&config, &mut crash_info)?;
crash_info.upload_to_endpoint(&config.endpoint)
crash_info.async_upload_to_endpoint(&config.endpoint).await
}
}
}
Expand Down Expand Up @@ -237,14 +262,17 @@ enum CrashReportStatus {
/// In the case where the parent failed to transfer a full crash-report
/// (for instance if it crashed while calculating the crash-report), we return
/// a PartialCrashReport.
fn receive_report(stream: impl std::io::BufRead) -> anyhow::Result<CrashReportStatus> {
async fn receive_report(
stream: impl AsyncBufReadExt + std::marker::Unpin,
) -> anyhow::Result<CrashReportStatus> {
let mut crashinfo = CrashInfo::new();
let mut stdin_state = StdinState::Waiting;
let mut config = None;

let mut lines = stream.lines();

//TODO: This assumes that the input is valid UTF-8.
for line in stream.lines() {
let line = line?;
while let Some(line) = lines.next_line().await? {
match process_line(&mut crashinfo, &mut config, line, stdin_state) {
Ok(next_state) => stdin_state = next_state,
Err(e) => {
Expand Down
16 changes: 16 additions & 0 deletions sidecar-ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use datadog_sidecar::agent_remote_config::{
};
use datadog_sidecar::config;
use datadog_sidecar::config::LogMethod;
use datadog_sidecar::crashtracker::crashtracker_unix_socket_path;
use datadog_sidecar::dogstatsd::DogStatsDAction;
use datadog_sidecar::one_way_shared_memory::{OneWayShmReader, ReaderOpener};
use datadog_sidecar::service::{
Expand Down Expand Up @@ -827,3 +828,18 @@ pub extern "C" fn ddog_sidecar_reconnect(
) {
transport.reconnect(|| unsafe { factory() });
}

/// Return the path of the crashtracker unix domain socket.
#[no_mangle]
#[allow(clippy::missing_safety_doc)]
pub unsafe extern "C" fn ddog_sidecar_get_crashtracker_unix_socket_path() -> ffi::CharSlice<'static>
{
let socket_path = crashtracker_unix_socket_path();
let str = socket_path.to_str().unwrap_or_default();

let size = str.len();
let malloced = libc::malloc(size) as *mut u8;
let buf = slice::from_raw_parts_mut(malloced, size);
buf.copy_from_slice(str.as_bytes());
ffi::CharSlice::from_raw_parts(malloced as *mut c_char, size)
}
1 change: 1 addition & 0 deletions sidecar/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ datadog-trace-protobuf = { path = "../trace-protobuf" }
datadog-trace-utils = { path = "../trace-utils" }
datadog-trace-normalization = { path = "../trace-normalization" }
datadog-remote-config = { path = "../remote-config" }
datadog-crashtracker = { path = "../crashtracker" }

futures = { version = "0.3", default-features = false }
manual_future = "0.1.1"
Expand Down
13 changes: 13 additions & 0 deletions sidecar/src/crashtracker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

use std::{env, path::PathBuf};

use crate::primary_sidecar_identifier;

pub fn crashtracker_unix_socket_path() -> PathBuf {
env::temp_dir().join(format!(
concat!("libdatadog.ct.", crate::sidecar_version!(), "@{}.sock"),
primary_sidecar_identifier()
))
}
Loading

0 comments on commit df81898

Please sign in to comment.