Skip to content

Commit 970baa6

Browse files
consume observations on drop of iterator (#1362)
consume observations on drop of iterator add test for anyhow bailout `upscale_values` can not fail Add missing comment Co-authored-by: florian.engelhardt <florian.engelhardt@datadoghq.com>
1 parent 4c350fa commit 970baa6

File tree

3 files changed

+119
-33
lines changed

3 files changed

+119
-33
lines changed

libdd-profiling/src/internal/observation/observations.rs

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -115,9 +115,10 @@ impl Observations {
115115
.try_into_iter()?
116116
.map(|(s, t, o)| (s, Some(t), o));
117117

118-
let agg_it = std::mem::take(&mut aggregated_data.data)
119-
.into_iter()
120-
.map(move |(s, o)| (s, None, unsafe { o.into_vec(obs_len) }));
118+
let agg_it = AggregatedObservationsIter {
119+
iter: std::mem::take(&mut aggregated_data.data).into_iter(),
120+
obs_len,
121+
};
121122

122123
Ok(ObservationsIntoIter {
123124
it: Box::new(ts_it.chain(agg_it)),
@@ -195,6 +196,35 @@ impl Drop for AggregatedObservations {
195196
}
196197
}
197198

199+
/// This iterator does only exist to make the drop work as in: consume the rest of the iterator on
200+
/// drop to clean up and not leak memory
201+
struct AggregatedObservationsIter {
202+
iter: std::collections::hash_map::IntoIter<Sample, TrimmedObservation>,
203+
obs_len: ObservationLength,
204+
}
205+
206+
impl Iterator for AggregatedObservationsIter {
207+
type Item = (Sample, Option<Timestamp>, Vec<i64>);
208+
209+
fn next(&mut self) -> Option<Self::Item> {
210+
let (sample, observation) = self.iter.next()?;
211+
// SAFETY: The only way to build one of these is through
212+
// [Observations::add], which already checked that the length was correct.
213+
let vec = unsafe { observation.into_vec(self.obs_len) };
214+
Some((sample, None, vec))
215+
}
216+
}
217+
218+
impl Drop for AggregatedObservationsIter {
219+
fn drop(&mut self) {
220+
for (_, observation) in &mut self.iter {
221+
// SAFETY: The only way to build one of these is through
222+
// [Observations::add], which already checked that the length was correct.
223+
unsafe { observation.consume(self.obs_len) };
224+
}
225+
}
226+
}
227+
198228
pub struct ObservationsIntoIter {
199229
it: Box<dyn Iterator<Item = <ObservationsIntoIter as Iterator>::Item>>,
200230
}

libdd-profiling/src/internal/profile/mod.rs

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -424,7 +424,7 @@ impl Profile {
424424
.map(Id::to_raw_id)
425425
.collect();
426426
self.check_location_ids_are_valid(&location_ids, self.locations.len())?;
427-
self.upscaling_rules.upscale_values(&mut values, labels)?;
427+
self.upscaling_rules.upscale_values(&mut values, labels);
428428

429429
// Use the extra slot in the labels vector to store the timestamp without any reallocs.
430430
if let Some(ts) = timestamp {
@@ -2605,4 +2605,61 @@ mod api_tests {
26052605
.iter()
26062606
.any(|s| s == "world"));
26072607
}
2608+
2609+
#[test]
2610+
fn reproduce_crash_with_anyhow_bailout() {
2611+
let sample_types = [api::ValueType::new("samples", "count")];
2612+
let mapping = api::Mapping {
2613+
filename: "test.php",
2614+
..Default::default()
2615+
};
2616+
2617+
let mut profile = Profile::new(&sample_types, None);
2618+
2619+
let locations = vec![api::Location {
2620+
mapping,
2621+
function: api::Function {
2622+
name: "test_function",
2623+
system_name: "test_function",
2624+
filename: "test.php",
2625+
},
2626+
line: 0,
2627+
..Default::default()
2628+
}];
2629+
2630+
let sample = api::Sample {
2631+
locations,
2632+
values: &[1],
2633+
labels: vec![api::Label {
2634+
key: "iteration",
2635+
num: 1,
2636+
..Default::default()
2637+
}],
2638+
};
2639+
2640+
profile
2641+
.try_add_sample(sample, None) // No timestamp = aggregated
2642+
.expect("profile to not be full");
2643+
2644+
// We want to trigger an error inside the loop over observations.
2645+
// By clearing the locations, the location IDs referenced by the samples/stacktraces
2646+
// will become invalid (checking against len=0), causing check_location_ids_are_valid to
2647+
// fail.
2648+
profile.locations.clear();
2649+
2650+
let result = profile.serialize_into_compressed_pprof(None, None);
2651+
2652+
match result {
2653+
Ok(_) => panic!(
2654+
"Expected serialization to fail due to invalid location IDs, but it succeeded"
2655+
),
2656+
Err(err) => {
2657+
assert!(
2658+
err.to_string().contains("invalid location id"),
2659+
"Expected error about invalid location id, got: {}",
2660+
err
2661+
);
2662+
}
2663+
}
2664+
}
26082665
}

libdd-profiling/src/internal/upscaling.rs

Lines changed: 28 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -165,37 +165,36 @@ impl UpscalingRules {
165165
self.rules.is_empty()
166166
}
167167

168-
pub fn upscale_values(&self, values: &mut [i64], labels: &[Label]) -> anyhow::Result<()> {
169-
if !self.is_empty() {
170-
// get bylabel rules first (if any)
171-
let mut group_of_rules = labels
172-
.iter()
173-
.filter_map(|label| {
174-
self.get(&(
175-
label.get_key(),
176-
match label.get_value() {
177-
LabelValue::Str(str) => *str,
178-
LabelValue::Num { .. } => StringId::ZERO,
179-
},
180-
))
181-
})
182-
.collect::<Vec<&Vec<UpscalingRule>>>();
183-
184-
// get byvalue rules if any
185-
if let Some(byvalue_rules) = self.get(&(StringId::ZERO, StringId::ZERO)) {
186-
group_of_rules.push(byvalue_rules);
187-
}
168+
pub fn upscale_values(&self, values: &mut [i64], labels: &[Label]) {
169+
if self.is_empty() {
170+
return;
171+
}
172+
// get bylabel rules first (if any)
173+
let mut group_of_rules = labels
174+
.iter()
175+
.filter_map(|label| {
176+
self.get(&(
177+
label.get_key(),
178+
match label.get_value() {
179+
LabelValue::Str(str) => *str,
180+
LabelValue::Num { .. } => StringId::ZERO,
181+
},
182+
))
183+
})
184+
.collect::<Vec<&Vec<UpscalingRule>>>();
188185

189-
group_of_rules.iter().for_each(|rules| {
190-
rules.iter().for_each(|rule| {
191-
let scale = rule.compute_scale(values);
192-
rule.values_offset.iter().for_each(|offset| {
193-
values[*offset] = (values[*offset] as f64 * scale).round() as i64
194-
})
195-
})
196-
});
186+
// get byvalue rules if any
187+
if let Some(byvalue_rules) = self.get(&(StringId::ZERO, StringId::ZERO)) {
188+
group_of_rules.push(byvalue_rules);
197189
}
198190

199-
Ok(())
191+
group_of_rules.iter().for_each(|rules| {
192+
rules.iter().for_each(|rule| {
193+
let scale = rule.compute_scale(values);
194+
rule.values_offset.iter().for_each(|offset| {
195+
values[*offset] = (values[*offset] as f64 * scale).round() as i64
196+
})
197+
})
198+
});
200199
}
201200
}

0 commit comments

Comments
 (0)