Skip to content

Commit

Permalink
Merge branch 'main' into upstream-dogstatsd
Browse files Browse the repository at this point in the history
  • Loading branch information
duncanpharvey committed Sep 4, 2024
2 parents 92201f2 + c035537 commit afadf29
Show file tree
Hide file tree
Showing 21 changed files with 584 additions and 44 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ jobs:
run: chmod +x ./scripts/install-protoc.sh && ./scripts/install-protoc.sh $HOME
- name: Run clippy on ${{ matrix.platform }} ${{ matrix.rust_version }}
shell: bash
run: cargo clippy --all-targets --all-features -- -D warnings "$([ ${{ matrix.rust_version }} = 1.76.0 ] && echo -Aunknown-lints)"
run: |
# shellcheck disable=SC2046
cargo clippy --all-targets --all-features -- -D warnings $([ ${{ matrix.rust_version }} = 1.76.0 ] && echo -Aunknown-lints -Aclippy::cast_ref_to_mut)
licensecheck:
runs-on: ubuntu-latest
name: "Presence of licence headers"
Expand Down
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions data-pipeline/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ rust-version.workspace = true
edition.workspace = true
version.workspace = true
license.workspace = true
autobenches = false

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

Expand All @@ -26,5 +27,11 @@ datadog-ddsketch = { path = "../ddsketch"}
[lib]
bench = false

[[bench]]
name = "main"
harness = false
path = "benches/main.rs"

[dev-dependencies]
criterion = "0.5.1"
rand = "0.8.5"
7 changes: 7 additions & 0 deletions data-pipeline/benches/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0
use criterion::criterion_main;

mod span_concentrator_bench;

criterion_main!(span_concentrator_bench::benches);
72 changes: 72 additions & 0 deletions data-pipeline/benches/span_concentrator_bench.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0
use std::{
collections::HashMap,
time::{self, Duration, SystemTime},
};

use criterion::{criterion_group, Criterion};
use data_pipeline::span_concentrator::SpanConcentrator;
use datadog_trace_protobuf::pb;

fn get_bucket_start(now: SystemTime, n: u64) -> i64 {
let start = now.duration_since(time::UNIX_EPOCH).unwrap() + Duration::from_secs(10 * n);
start.as_nanos() as i64
}

fn get_span(now: SystemTime, trace_id: u64, span_id: u64) -> pb::Span {
let mut metrics = HashMap::from([("_dd.measured".to_string(), 1.0)]);
if span_id == 1 {
metrics.insert("_dd.top_level".to_string(), 1.0);
}
let mut meta = HashMap::from([("db_name".to_string(), "postgres".to_string())]);
if span_id % 3 == 0 {
meta.insert("bucket_s3".to_string(), "aws_bucket".to_string());
}
pb::Span {
trace_id,
span_id,
service: "test-service".to_string(),
name: "test-name".to_string(),
resource: format!("test-{trace_id}"),
error: (span_id % 2) as i32,
metrics,
meta,
parent_id: span_id - 1,
start: get_bucket_start(now, trace_id),
duration: span_id as i64 % Duration::from_secs(10).as_nanos() as i64,
..Default::default()
}
}

pub fn criterion_benchmark(c: &mut Criterion) {
let mut group = c.benchmark_group("concentrator");
let now = SystemTime::now() - Duration::from_secs(10 * 100);
let concentrator = SpanConcentrator::new(
Duration::from_secs(10),
now,
true,
true,
vec!["db_name".to_string(), "bucket_s3".to_string()],
);
let mut spans = vec![];
for trace_id in 1..100 {
for span_id in 1..100 {
spans.push(get_span(now, trace_id, span_id));
}
}
group.bench_function("add_spans_to_concentrator", |b| {
b.iter_batched_ref(
|| (concentrator.clone(), spans.clone()),
|data| {
let concentrator = &mut data.0;
let spans = &data.1;
for span in spans {
concentrator.add_span(span);
}
},
criterion::BatchSize::LargeInput,
);
});
}
criterion_group!(benches, criterion_benchmark);
2 changes: 1 addition & 1 deletion data-pipeline/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

mod span_concentrator;
pub mod span_concentrator;
pub mod trace_exporter;
4 changes: 2 additions & 2 deletions data-pipeline/src/span_concentrator/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ fn get_peer_tags(span: &pb::Span, peer_tag_keys: &[String]) -> Vec<Tag> {
}

/// The stats computed from a group of span with the same AggregationKey
#[derive(Debug, Default)]
#[derive(Debug, Default, Clone)]
pub(super) struct GroupedStats {
hits: u64,
errors: u64,
Expand Down Expand Up @@ -140,7 +140,7 @@ impl GroupedStats {

/// A time bucket used for stats aggregation. It stores a map of GroupedStats storing the stats of
/// spans aggregated on their AggregationKey.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub(super) struct StatsBucket {
data: HashMap<AggregationKey, GroupedStats>,
start: u64,
Expand Down
2 changes: 1 addition & 1 deletion data-pipeline/src/span_concentrator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ fn should_ignore_span(span: &pb::Span, compute_stats_by_span_kind: bool) -> bool
/// When the SpanConcentrator is flushed it keeps the `buffer_len` most recent buckets and remove
/// all older buckets returning their content. When using force flush all buckets are flushed
/// regardless of their age.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct SpanConcentrator {
/// Size of the time buckets used for aggregation in nanos
bucket_size: u64,
Expand Down
8 changes: 8 additions & 0 deletions ddcommon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ tokio = { version = "1.23", features = ["rt", "macros"] }
tokio-rustls = { version = "0.26", default-features = false }
serde = { version = "1.0", features = ["derive"] }
static_assertions = "1.1.0"
libc = "0.2"

[target.'cfg(windows)'.dependencies.windows-sys]
version = "0.52"
features = [
"Win32_Foundation",
"Win32_System_Performance",
]

[target.'cfg(unix)'.dependencies]
hyper-rustls = { version = "0.27", default-features = false, features = [
Expand Down
1 change: 1 addition & 0 deletions ddcommon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub mod entity_id;
#[macro_use]
pub mod cstr;
pub mod config;
pub mod rate_limiter;
pub mod tag;

pub mod header {
Expand Down
186 changes: 186 additions & 0 deletions ddcommon/src/rate_limiter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

use std::sync::atomic::{AtomicI64, AtomicU32, AtomicU64, Ordering};

pub trait Limiter {
/// Takes the limit per interval.
/// Returns false if the limit is exceeded, otherwise true.
fn inc(&self, limit: u32) -> bool;
/// Returns the effective rate per interval.
/// Note: The rate is only guaranteed to be accurate immediately after a call to inc().
fn rate(&self) -> f64;
}

/// A thread-safe limiter built on Atomics.
/// It's base unit is in seconds, i.e. the minimum allowed rate is 1 per second.
/// Internally the limiter works with the system time granularity, i.e. nanoseconds on unix and
/// milliseconds on windows.
/// The implementation is a sliding window: every time the limiter is increased, the amount of time
/// that has passed is also refilled.
#[repr(C)]
pub struct LocalLimiter {
hit_count: AtomicI64,
last_update: AtomicU64,
last_limit: AtomicU32,
granularity: i64,
}

const TIME_PER_SECOND: i64 = 1_000_000_000; // nanoseconds

fn now() -> u64 {
#[cfg(windows)]
let now = unsafe {
static FREQUENCY: AtomicU64 = AtomicU64::new(0);

let mut frequency = FREQUENCY.load(Ordering::Relaxed);
if frequency == 0 {
windows_sys::Win32::System::Performance::QueryPerformanceFrequency(
&mut frequency as *mut u64 as *mut i64,
);
FREQUENCY.store(frequency, Ordering::Relaxed);
}

let mut perf_counter = 0;
windows_sys::Win32::System::Performance::QueryPerformanceCounter(&mut perf_counter);
perf_counter as u64 * frequency / TIME_PER_SECOND as u64
};
#[cfg(not(windows))]
let now = {
let mut ts: libc::timespec = libc::timespec {
tv_sec: 0,
tv_nsec: 0,
};
unsafe { libc::clock_gettime(libc::CLOCK_MONOTONIC, &mut ts) };
(ts.tv_sec * TIME_PER_SECOND + ts.tv_nsec) as u64
};
now
}

impl Default for LocalLimiter {
fn default() -> Self {
LocalLimiter {
hit_count: Default::default(),
last_update: AtomicU64::from(now()),
last_limit: Default::default(),
granularity: TIME_PER_SECOND,
}
}
}

impl LocalLimiter {
/// Allows setting a custom time granularity. The default() implementation is 1 second.
pub fn with_granularity(seconds: u32) -> LocalLimiter {
let mut limiter = LocalLimiter::default();
limiter.granularity *= seconds as i64;
limiter
}

/// Resets, with a given granularity.
pub fn reset(&mut self, seconds: u32) {
self.last_update.store(now(), Ordering::Relaxed);
self.hit_count.store(0, Ordering::Relaxed);
self.last_limit.store(0, Ordering::Relaxed);
self.granularity = TIME_PER_SECOND * seconds as i64;
}
}

impl Limiter for LocalLimiter {
fn inc(&self, limit: u32) -> bool {
let now = now();
let last = self.last_update.swap(now, Ordering::SeqCst);
// Make sure reducing the limit doesn't stall for a long time
let clear_limit = limit.max(self.last_limit.load(Ordering::Relaxed));
let clear_counter = (now as i64 - last as i64) * (clear_limit as i64);
let subtract = clear_counter - self.granularity;
let mut previous_hits = self.hit_count.fetch_sub(subtract, Ordering::SeqCst);
// Handle where the limiter goes below zero
if previous_hits < subtract {
let add = clear_counter - previous_hits.max(0);
self.hit_count.fetch_add(add, Ordering::Acquire);
previous_hits += add - clear_counter;
}
if previous_hits / self.granularity >= limit as i64 {
self.hit_count
.fetch_sub(self.granularity, Ordering::Acquire);
false
} else {
// We don't care about race conditions here:
// If the last limit was high enough to increase the previous_hits, we are anyway close
// to a number realistic to decrease the count quickly; i.e. we won't stall the limiter
// indefinitely when switching from a high to a low limit.
self.last_limit.store(limit, Ordering::Relaxed);
true
}
}

fn rate(&self) -> f64 {
let last_limit = self.last_limit.load(Ordering::Relaxed);
let hit_count = self.hit_count.load(Ordering::Relaxed);
(hit_count as f64 / (last_limit as i64 * self.granularity) as f64).clamp(0., 1.)
}
}

#[cfg(test)]
mod tests {
use crate::rate_limiter::{Limiter, LocalLimiter, TIME_PER_SECOND};
use std::sync::atomic::Ordering;
use std::thread::sleep;
use std::time::Duration;

#[test]
#[cfg_attr(miri, ignore)]
fn test_rate_limiter() {
let limiter = LocalLimiter::default();
// Two are allowed, then one more because a small amount of time passed since the first one
assert!(limiter.inc(2));
// Work around floating point precision issues
assert!(limiter.rate() > 0.49999 && limiter.rate() <= 0.5);
// Add a minimal amount of time to ensure the test doesn't run faster than timer precision
sleep(Duration::from_micros(100));
assert!(limiter.inc(2));
// We're close to 1, but not quite, due to the minimal time passed
assert!(limiter.rate() > 0.5 && limiter.rate() < 1.);
sleep(Duration::from_micros(100));
assert!(limiter.inc(2));
// Rate capped at 1
assert_eq!(1., limiter.rate());
sleep(Duration::from_micros(100));
assert!(!limiter.inc(2));
sleep(Duration::from_micros(100));
assert!(!limiter.inc(2));
sleep(Duration::from_micros(100));

// reduce 4 times, we're going into negative territory. Next increment will reset to zero.
limiter
.last_update
.fetch_sub(3 * TIME_PER_SECOND as u64, Ordering::Relaxed);
assert!(limiter.inc(2));
// Work around floating point precision issues
assert!(limiter.rate() > 0.49999 && limiter.rate() <= 0.5); // We're starting from scratch
sleep(Duration::from_micros(100));
assert!(limiter.inc(2));
sleep(Duration::from_micros(100));
assert!(limiter.inc(2));
sleep(Duration::from_micros(100));
assert!(!limiter.inc(2));
sleep(Duration::from_micros(100));

// Test change to higher value
assert!(limiter.inc(3));
sleep(Duration::from_micros(100));
assert!(!limiter.inc(3));

// Then change to lower value - but we have no capacity
assert!(!limiter.inc(1));

// The counter is around 4 (because last limit was 3)
// We're keeping the highest successful limit stored, thus subtracting 3 twice will reset it
limiter
.last_update
.fetch_sub(2 * TIME_PER_SECOND as u64, Ordering::Relaxed);

// And now 1 succeeds again.
assert!(limiter.inc(1));
}
}
1 change: 1 addition & 0 deletions ipc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ libc = { version = "0.2" }
# tarpc needed extensions to allow 1 way communication and to export some internal structs
tarpc = { path = "tarpc/tarpc", default-features = false, features = ["serde-transport"], package = "tarpc" }

ddcommon = { path = "../ddcommon" }
datadog-ipc-macros = { path = "macros" }

[dev-dependencies]
Expand Down
Loading

0 comments on commit afadf29

Please sign in to comment.