Skip to content

Commit f96a538

Browse files
committed
Merge branch 'main' into paullgdc/data-pipeline/stats_collector_hmap
2 parents ec0dfb6 + f61c42a commit f96a538

File tree

36 files changed

+1281
-110
lines changed

36 files changed

+1281
-110
lines changed

.github/workflows/diff-proto-files.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,4 @@ fi
3333
GO_AGENT_PROTO=$(curl -s "https://raw.githubusercontent.com/DataDog/datadog-agent/$DATADOG_AGENT_TAG/pkg/proto/datadog/trace/$PROTO_FILE")
3434
FIX_IMPORT_PATH=$(echo "$GO_AGENT_PROTO" | sed -e 's/import "datadog\/trace\//import "/g')
3535
FIX_PACKAGE_NAME=$(echo "$FIX_IMPORT_PATH" | sed -e 's/datadog\.trace/pb/g')
36-
echo "$FIX_PACKAGE_NAME" | diff "$PROTO_FILE" -
36+
echo "$FIX_PACKAGE_NAME" | diff -u "$PROTO_FILE" -

.github/workflows/verify-proto-files.yml

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ on:
44
types: [ opened, synchronize, reopened ]
55

66
env:
7-
DATADOG_AGENT_TAG: "7.55.0-rc.3"
7+
DATADOG_AGENT_TAG: "7f6d07c93ba087f23f80a3f0c2da4b1f3dc664d7"
88
rust_version: "1.84.1"
99

1010
jobs:
@@ -35,6 +35,16 @@ jobs:
3535
working-directory: datadog-trace-protobuf/src/pb
3636
run: |
3737
../../../.github/workflows/diff-proto-files.sh --file span.proto --tag ${{ env.DATADOG_AGENT_TAG }}
38+
- name: diff idx/tracer_payload.proto
39+
if: success() || failure()
40+
working-directory: datadog-trace-protobuf/src/pb
41+
run: |
42+
../../../.github/workflows/diff-proto-files.sh --file idx/tracer_payload.proto --tag ${{ env.DATADOG_AGENT_TAG }}
43+
- name: diff idx/span.proto
44+
if: success() || failure()
45+
working-directory: datadog-trace-protobuf/src/pb
46+
run: |
47+
../../../.github/workflows/diff-proto-files.sh --file idx/span.proto --tag ${{ env.DATADOG_AGENT_TAG }}
3848
- name: Cache
3949
uses: ./.github/actions/cache
4050
with:

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bin_tests/src/modes/behavior.rs

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ use std::path::Path;
1010
use std::sync::atomic::{AtomicPtr, Ordering};
1111

1212
use crate::modes::unix::*;
13+
use nix::sys::socket;
14+
use std::os::unix::io::AsRawFd;
1315

1416
/// Defines the additional behavior for a given crashtracking test
1517
pub trait Behavior {
@@ -89,6 +91,32 @@ pub fn remove_permissive(filepath: &Path) {
8991
let _ = std::fs::remove_file(filepath);
9092
}
9193

94+
// This helper function is used to trigger a SIGPIPE signal. This is useful to
95+
// verify that the crashtracker correctly suppresses the SIGPIPE signal while it
96+
// emitts information to the collector, and that the SIGPIPE signal can be emitted
97+
// and used normally afterwards, as tested in the following tests:
98+
// - test_001_sigpipe
99+
// - test_005_sigpipe_sigstack
100+
pub fn trigger_sigpipe() -> Result<()> {
101+
let (reader_fd, writer_fd) = socket::socketpair(
102+
socket::AddressFamily::Unix,
103+
socket::SockType::Stream,
104+
None,
105+
socket::SockFlag::empty(),
106+
)?;
107+
drop(reader_fd);
108+
109+
let writer_raw_fd = writer_fd.as_raw_fd();
110+
let write_result =
111+
unsafe { libc::write(writer_raw_fd, b"Hello".as_ptr() as *const libc::c_void, 5) };
112+
113+
if write_result != -1 {
114+
anyhow::bail!("Expected write to fail with SIGPIPE, but it succeeded");
115+
}
116+
117+
Ok(())
118+
}
119+
92120
pub fn get_behavior(mode_str: &str) -> Box<dyn Behavior> {
93121
match mode_str {
94122
"donothing" => Box::new(test_000_donothing::Test),
@@ -104,3 +132,77 @@ pub fn get_behavior(mode_str: &str) -> Box<dyn Behavior> {
104132
_ => panic!("Unknown mode: {mode_str}"),
105133
}
106134
}
135+
136+
#[cfg(test)]
137+
mod tests {
138+
use super::*;
139+
use std::sync::atomic::{AtomicBool, Ordering};
140+
141+
static SIGPIPE_CAUGHT: AtomicBool = AtomicBool::new(false);
142+
143+
extern "C" fn sigpipe_handler(_: libc::c_int) {
144+
SIGPIPE_CAUGHT.store(true, Ordering::SeqCst);
145+
}
146+
147+
#[test]
148+
#[cfg_attr(miri, ignore)]
149+
fn test_trigger_sigpipe() {
150+
use std::mem;
151+
use std::thread;
152+
153+
let result = thread::spawn(|| {
154+
SIGPIPE_CAUGHT.store(false, Ordering::SeqCst);
155+
156+
let mut sigset: libc::sigset_t = unsafe { mem::zeroed() };
157+
unsafe {
158+
libc::sigemptyset(&mut sigset);
159+
}
160+
161+
let sigpipe_action = libc::sigaction {
162+
sa_sigaction: sigpipe_handler as usize,
163+
sa_mask: sigset,
164+
sa_flags: libc::SA_RESTART | libc::SA_SIGINFO,
165+
#[cfg(target_os = "linux")]
166+
sa_restorer: None,
167+
};
168+
169+
let mut old_action: libc::sigaction = unsafe { mem::zeroed() };
170+
let install_result =
171+
unsafe { libc::sigaction(libc::SIGPIPE, &sigpipe_action, &mut old_action) };
172+
173+
if install_result != 0 {
174+
return Err("Failed to set up SIGPIPE handler".to_string());
175+
}
176+
177+
let trigger_result = trigger_sigpipe();
178+
179+
thread::sleep(std::time::Duration::from_millis(10));
180+
181+
let handler_called = SIGPIPE_CAUGHT.load(Ordering::SeqCst);
182+
183+
unsafe {
184+
libc::sigaction(libc::SIGPIPE, &old_action, std::ptr::null_mut());
185+
}
186+
187+
if trigger_result.is_err() {
188+
return Err(format!(
189+
"trigger_sigpipe should succeed: {:?}",
190+
trigger_result
191+
));
192+
}
193+
194+
if !handler_called {
195+
return Err("SIGPIPE handler should have been called".to_string());
196+
}
197+
198+
Ok(())
199+
})
200+
.join();
201+
202+
match result {
203+
Ok(Ok(())) => {} // Test passed
204+
Ok(Err(e)) => panic!("{}", e),
205+
Err(_) => panic!("Thread panicked during SIGPIPE test"),
206+
}
207+
}
208+
}

bin_tests/src/modes/unix/test_001_sigpipe.rs

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,11 @@
1919
use crate::modes::behavior::Behavior;
2020
use crate::modes::behavior::{
2121
atom_to_clone, file_append_msg, fileat_content_equals, remove_permissive, removeat_permissive,
22-
set_atomic,
22+
set_atomic, trigger_sigpipe,
2323
};
2424

2525
use datadog_crashtracker::CrashtrackerConfiguration;
2626
use libc;
27-
use nix::sys::socket;
28-
use std::io::prelude::*;
29-
use std::os::unix::net::UnixStream;
3027
use std::path::{Path, PathBuf};
3128
use std::sync::atomic::AtomicPtr;
3229

@@ -80,19 +77,7 @@ fn inner(output_dir: &Path, filename: &str) -> anyhow::Result<()> {
8077
set_atomic(&OUTPUT_FILE, output_dir.join(filename));
8178
let ofile = atom_to_clone(&OUTPUT_FILE)?;
8279

83-
// Cause a SIGPIPE to occur by opening a socketpair, closing the read side, and writing into
84-
// the write side.
85-
let (reader_fd, writer_fd) = socket::socketpair(
86-
socket::AddressFamily::Unix,
87-
socket::SockType::Stream,
88-
None,
89-
socket::SockFlag::empty(),
90-
)?;
91-
drop(reader_fd);
92-
let mut writer = UnixStream::from(writer_fd);
93-
if writer.write_all(b"Hello").is_ok() {
94-
anyhow::bail!("Expected write to fail, but it succeeded");
95-
}
80+
trigger_sigpipe()?;
9681

9782
// Now check the output file. Strongly assumes that nothing happened to change the value of
9883
// OUTPUT_FILE within the handler.

bin_tests/src/modes/unix/test_005_sigpipe_sigstack.rs

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,11 @@
1010
use crate::modes::behavior::Behavior;
1111
use crate::modes::behavior::{
1212
atom_to_clone, file_append_msg, file_content_equals, fileat_content_equals, remove_permissive,
13-
removeat_permissive, set_atomic,
13+
removeat_permissive, set_atomic, trigger_sigpipe,
1414
};
1515

1616
use datadog_crashtracker::CrashtrackerConfiguration;
1717
use libc;
18-
use nix::sys::socket;
19-
use std::io::prelude::*;
20-
use std::os::unix::net::UnixStream;
2118
use std::path::{Path, PathBuf};
2219
use std::sync::atomic::AtomicPtr;
2320

@@ -71,19 +68,7 @@ fn inner(output_dir: &Path, filename: &str) -> anyhow::Result<()> {
7168
set_atomic(&OUTPUT_FILE, output_dir.join(filename));
7269
let ofile = atom_to_clone(&OUTPUT_FILE)?;
7370

74-
// Cause a SIGPIPE to occur by opening a socketpair, closing the read side, and writing into
75-
// the write side.
76-
let (reader_fd, writer_fd) = socket::socketpair(
77-
socket::AddressFamily::Unix,
78-
socket::SockType::Stream,
79-
None,
80-
socket::SockFlag::empty(),
81-
)?;
82-
drop(reader_fd);
83-
let mut writer = UnixStream::from(writer_fd);
84-
if writer.write_all(b"Hello").is_ok() {
85-
anyhow::bail!("Expected write to fail, but it succeeded");
86-
}
71+
trigger_sigpipe()?;
8772

8873
// Now check the output file. Strongly assumes that nothing happened to change the value of
8974
// OUTPUT_FILE within the handler.

data-pipeline/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ harness = false
5252
path = "benches/main.rs"
5353

5454
[dev-dependencies]
55+
clap = { version = "4.0", features = ["derive"] }
5556
criterion = "0.5.1"
5657
datadog-trace-utils = { path = "../datadog-trace-utils", features = [
5758
"test-utils",

data-pipeline/examples/send-traces-with-stats.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
22
// SPDX-License-Identifier: Apache-2.0
33

4+
use clap::Parser;
45
use data_pipeline::trace_exporter::{
56
TraceExporter, TraceExporterInputFormat, TraceExporterOutputFormat,
67
};
@@ -29,10 +30,24 @@ fn get_span(now: i64, trace_id: u64, span_id: u64) -> pb::Span {
2930
}
3031
}
3132

33+
#[derive(Parser)]
34+
#[command(name = "send-traces-with-stats")]
35+
#[command(about = "A data pipeline example for sending traces with statistics")]
36+
struct Args {
37+
#[arg(
38+
short = 'u',
39+
long = "url",
40+
default_value = "http://localhost:8126",
41+
help = "Set the trace agent URL\n\nExamples:\n http://localhost:8126 (default)\n windows://./pipe/dd-apm-test-agent (Windows named pipe)\n https://trace.agent.datadoghq.com:443 (custom endpoint)"
42+
)]
43+
url: String,
44+
}
45+
3246
fn main() {
47+
let args = Args::parse();
3348
let mut builder = TraceExporter::builder();
3449
builder
35-
.set_url("http://localhost:8126")
50+
.set_url(&args.url)
3651
.set_hostname("test")
3752
.set_env("testing")
3853
.set_app_version(env!("CARGO_PKG_VERSION"))

0 commit comments

Comments
 (0)