Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion/execution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,4 @@ url = { workspace = true }

[dev-dependencies]
chrono = { workspace = true }
insta = { workspace = true }
180 changes: 98 additions & 82 deletions datafusion/execution/src/memory_pool/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,8 +443,18 @@ fn provide_top_memory_consumers_to_error_msg(
#[cfg(test)]
mod tests {
use super::*;
use insta::{allow_duplicates, assert_snapshot, Settings};
use std::sync::Arc;

fn make_settings() -> Settings {
let mut settings = Settings::clone_current();
settings.add_filter(
r"([^\s]+)\#\d+\(can spill: (true|false)\)",
"$1#[ID](can spill: $2)",
);
settings
}

#[test]
fn test_fair() {
let pool = Arc::new(FairSpillPool::new(100)) as _;
Expand All @@ -463,10 +473,10 @@ mod tests {
assert_eq!(pool.reserved(), 4000);

let err = r2.try_grow(1).unwrap_err().strip_backtrace();
assert_eq!(err, "Resources exhausted: Failed to allocate additional 1.0 B for r2 with 2000.0 B already allocated for this reservation - 0.0 B remain available for the total pool");
assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 1.0 B for r2 with 2000.0 B already allocated for this reservation - 0.0 B remain available for the total pool");

let err = r2.try_grow(1).unwrap_err().strip_backtrace();
assert_eq!(err, "Resources exhausted: Failed to allocate additional 1.0 B for r2 with 2000.0 B already allocated for this reservation - 0.0 B remain available for the total pool");
assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 1.0 B for r2 with 2000.0 B already allocated for this reservation - 0.0 B remain available for the total pool");

r1.shrink(1990);
r2.shrink(2000);
Expand All @@ -491,12 +501,12 @@ mod tests {
.register(&pool);

let err = r3.try_grow(70).unwrap_err().strip_backtrace();
assert_eq!(err, "Resources exhausted: Failed to allocate additional 70.0 B for r3 with 0.0 B already allocated for this reservation - 40.0 B remain available for the total pool");
assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 70.0 B for r3 with 0.0 B already allocated for this reservation - 40.0 B remain available for the total pool");

//Shrinking r2 to zero doesn't allow a3 to allocate more than 45
r2.free();
let err = r3.try_grow(70).unwrap_err().strip_backtrace();
assert_eq!(err, "Resources exhausted: Failed to allocate additional 70.0 B for r3 with 0.0 B already allocated for this reservation - 40.0 B remain available for the total pool");
assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 70.0 B for r3 with 0.0 B already allocated for this reservation - 40.0 B remain available for the total pool");

// But dropping r2 does
drop(r2);
Expand All @@ -509,11 +519,13 @@ mod tests {

let mut r4 = MemoryConsumer::new("s4").register(&pool);
let err = r4.try_grow(30).unwrap_err().strip_backtrace();
assert_eq!(err, "Resources exhausted: Failed to allocate additional 30.0 B for s4 with 0.0 B already allocated for this reservation - 20.0 B remain available for the total pool");
assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 30.0 B for s4 with 0.0 B already allocated for this reservation - 20.0 B remain available for the total pool");
}

#[test]
fn test_tracked_consumers_pool() {
let setting = make_settings();
let _bound = setting.bind_to_scope();
let pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
GreedyMemoryPool::new(100),
NonZeroUsize::new(3).unwrap(),
Expand Down Expand Up @@ -546,19 +558,22 @@ mod tests {
// Test: reports if new reservation causes error
// using the previously set sizes for other consumers
let mut r5 = MemoryConsumer::new("r5").register(&pool);
let expected = format!("Additional allocation failed with top memory consumers (across reservations) as:\n r1#{}(can spill: false) consumed 50.0 B,\n r3#{}(can spill: false) consumed 20.0 B,\n r2#{}(can spill: false) consumed 15.0 B.\nError: Failed to allocate additional 150.0 B for r5 with 0.0 B already allocated for this reservation - 5.0 B remain available for the total pool", r1.consumer().id(), r3.consumer().id(), r2.consumer().id());
let res = r5.try_grow(150);
assert!(
matches!(
&res,
Err(DataFusionError::ResourcesExhausted(ref e)) if e.to_string().contains(&expected)
),
"should provide list of top memory consumers, instead found {res:?}"
);
assert!(res.is_err());
let error = res.unwrap_err().strip_backtrace();
assert_snapshot!(error, @r"
Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
r1#[ID](can spill: false) consumed 50.0 B,
r3#[ID](can spill: false) consumed 20.0 B,
r2#[ID](can spill: false) consumed 15.0 B.
Error: Failed to allocate additional 150.0 B for r5 with 0.0 B already allocated for this reservation - 5.0 B remain available for the total pool
");
}

#[test]
fn test_tracked_consumers_pool_register() {
let setting = make_settings();
let _bound = setting.bind_to_scope();
let pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
GreedyMemoryPool::new(100),
NonZeroUsize::new(3).unwrap(),
Expand All @@ -568,15 +583,14 @@ mod tests {

// Test: see error message when no consumers recorded yet
let mut r0 = MemoryConsumer::new(same_name).register(&pool);
let expected = format!("Additional allocation failed with top memory consumers (across reservations) as:\n foo#{}(can spill: false) consumed 0.0 B.\nError: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 100.0 B remain available for the total pool", r0.consumer().id());
let res = r0.try_grow(150);
assert!(
matches!(
&res,
Err(DataFusionError::ResourcesExhausted(ref e)) if e.to_string().contains(&expected)
),
"should provide proper error when no reservations have been made yet, instead found {res:?}"
);
assert!(res.is_err());
let error = res.unwrap_err().strip_backtrace();
assert_snapshot!(error, @r"
Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
foo#[ID](can spill: false) consumed 0.0 B.
Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 100.0 B remain available for the total pool
");

// API: multiple registrations using the same hashed consumer,
// will be recognized *differently* in the TrackConsumersPool.
Expand All @@ -586,100 +600,101 @@ mod tests {
let mut r1 = new_consumer_same_name.register(&pool);
// TODO: the insufficient_capacity_err() message is per reservation, not per consumer.
// a followup PR will clarify this message "0 bytes already allocated for this reservation"
let expected = format!("Additional allocation failed with top memory consumers (across reservations) as:\n foo#{}(can spill: false) consumed 10.0 B,\n foo#{}(can spill: false) consumed 0.0 B.\nError: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 90.0 B remain available for the total pool", r0.consumer().id(), r1.consumer().id());
let res = r1.try_grow(150);
assert!(
matches!(
&res,
Err(DataFusionError::ResourcesExhausted(ref e)) if e.to_string().contains(&expected)
),
"should provide proper error for 2 consumers, instead found {res:?}"
);
assert!(res.is_err());
let error = res.unwrap_err().strip_backtrace();
assert_snapshot!(error, @r"
Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
foo#[ID](can spill: false) consumed 10.0 B,
foo#[ID](can spill: false) consumed 0.0 B.
Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 90.0 B remain available for the total pool
");

// Test: will accumulate size changes per consumer, not per reservation
r1.grow(20);
let expected = format!("Additional allocation failed with top memory consumers (across reservations) as:\n foo#{}(can spill: false) consumed 20.0 B,\n foo#{}(can spill: false) consumed 10.0 B.\nError: Failed to allocate additional 150.0 B for foo with 20.0 B already allocated for this reservation - 70.0 B remain available for the total pool", r1.consumer().id(), r0.consumer().id());

let res = r1.try_grow(150);
assert!(
matches!(
&res,
Err(DataFusionError::ResourcesExhausted(ref e)) if e.to_string().contains(&expected)
),
"should provide proper error for 2 consumers(one foo=20.0 B, another foo=10.0 B, available=70.0 B), instead found {res:?}"
);
assert!(res.is_err());
let error = res.unwrap_err().strip_backtrace();
assert_snapshot!(error, @r"
Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
foo#[ID](can spill: false) consumed 20.0 B,
foo#[ID](can spill: false) consumed 10.0 B.
Error: Failed to allocate additional 150.0 B for foo with 20.0 B already allocated for this reservation - 70.0 B remain available for the total pool
");

// Test: different hashed consumer, (even with the same name),
// will be recognized as different in the TrackConsumersPool
let consumer_with_same_name_but_different_hash =
MemoryConsumer::new(same_name).with_can_spill(true);
let mut r2 = consumer_with_same_name_but_different_hash.register(&pool);
let expected = format!("Additional allocation failed with top memory consumers (across reservations) as:\n foo#{}(can spill: false) consumed 20.0 B,\n foo#{}(can spill: false) consumed 10.0 B,\n foo#{}(can spill: true) consumed 0.0 B.\nError: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 70.0 B remain available for the total pool", r1.consumer().id(), r0.consumer().id(), r2.consumer().id());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it changes the logic of the test slightly because before it'd accomodate for variable consumer ids but now those are hardcoded in your tests. This is how I solved this problem before but feel free to explore other options

settings.add_filter(r"Elapsed .* seconds\.", "[ELAPSED]");

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For filtering some variable values in err message, you can also take a look at below :)

settings.add_filter(
r"[^\s]+\#\d+\(can spill: (true|false)\) consumed .*?B",
"Consumer(can spill: bool) consumed XB",
);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

let res = r2.try_grow(150);
assert!(
matches!(
&res,
Err(DataFusionError::ResourcesExhausted(ref e)) if e.to_string().contains(&expected)
),
"should provide proper error with 3 separate consumers(1 = 20 bytes, 2 = 10 bytes, 3 = 0 bytes), instead found {res:?}"
);
assert!(res.is_err());
let error = res.unwrap_err().strip_backtrace();
assert_snapshot!(error, @r"
Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
foo#[ID](can spill: false) consumed 20.0 B,
foo#[ID](can spill: false) consumed 10.0 B,
foo#[ID](can spill: true) consumed 0.0 B.
Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 70.0 B remain available for the total pool
");
}

#[test]
fn test_tracked_consumers_pool_deregister() {
fn test_per_pool_type(pool: Arc<dyn MemoryPool>) {
// Baseline: see the 2 memory consumers
let setting = make_settings();
let _bound = setting.bind_to_scope();
let mut r0 = MemoryConsumer::new("r0").register(&pool);
r0.grow(10);
let r1_consumer = MemoryConsumer::new("r1");
let mut r1 = r1_consumer.register(&pool);
r1.grow(20);

let expected = format!("Additional allocation failed with top memory consumers (across reservations) as:\n r1#{}(can spill: false) consumed 20.0 B,\n r0#{}(can spill: false) consumed 10.0 B.\nError: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 70.0 B remain available for the total pool", r1.consumer().id(), r0.consumer().id());
let res = r0.try_grow(150);
assert!(
matches!(
&res,
Err(DataFusionError::ResourcesExhausted(ref e)) if e.to_string().contains(&expected)
),
"should provide proper error with both consumers, instead found {res:?}"
);
assert!(res.is_err());
let error = res.unwrap_err().strip_backtrace();
allow_duplicates!(assert_snapshot!(error, @r"
Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
r1#[ID](can spill: false) consumed 20.0 B,
r0#[ID](can spill: false) consumed 10.0 B.
Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 70.0 B remain available for the total pool
"));

// Test: unregister one
// only the remaining one should be listed
drop(r1);
let expected_consumers = format!("Additional allocation failed with top memory consumers (across reservations) as:\n r0#{}(can spill: false) consumed 10.0 B", r0.consumer().id());
let res = r0.try_grow(150);
assert!(
matches!(
&res,
Err(DataFusionError::ResourcesExhausted(ref e)) if e.to_string().contains(&expected_consumers)
),
"should provide proper error with only 1 consumer left registered, instead found {res:?}"
);
assert!(res.is_err());
let error = res.unwrap_err().strip_backtrace();
allow_duplicates!(assert_snapshot!(error, @r"
Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
r0#[ID](can spill: false) consumed 10.0 B.
Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total pool
"));

// Test: actual message we see is the `available is 70`. When it should be `available is 90`.
// This is because the pool.shrink() does not automatically occur within the inner_pool.deregister().
let expected_90_available = "Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total pool";
let res = r0.try_grow(150);
assert!(
matches!(
&res,
Err(DataFusionError::ResourcesExhausted(ref e)) if e.to_string().contains(expected_90_available)
),
"should find that the inner pool will still count all bytes for the deregistered consumer until the reservation is dropped, instead found {res:?}"
);
assert!(res.is_err());
let error = res.unwrap_err().strip_backtrace();
allow_duplicates!(assert_snapshot!(error, @r"
Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
r0#[ID](can spill: false) consumed 10.0 B.
Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total pool
"));

// Test: the registration needs to free itself (or be dropped),
// for the proper error message
let expected_90_available = "Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total pool";
let res = r0.try_grow(150);
assert!(
matches!(
&res,
Err(DataFusionError::ResourcesExhausted(ref e)) if e.to_string().contains(expected_90_available)
),
"should correctly account the total bytes after reservation is free, instead found {res:?}"
);
assert!(res.is_err());
let error = res.unwrap_err().strip_backtrace();
allow_duplicates!(assert_snapshot!(error, @r"
Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
r0#[ID](can spill: false) consumed 10.0 B.
Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total pool
"));
}

let tracked_spill_pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
Expand All @@ -697,6 +712,8 @@ mod tests {

#[test]
fn test_tracked_consumers_pool_use_beyond_errors() {
let setting = make_settings();
let _bound = setting.bind_to_scope();
let upcasted: Arc<dyn std::any::Any + Send + Sync> =
Arc::new(TrackConsumersPool::new(
GreedyMemoryPool::new(100),
Expand All @@ -720,11 +737,10 @@ mod tests {
.unwrap();

// Test: can get runtime metrics, even without an error thrown
let expected = format!(" r3#{}(can spill: false) consumed 45.0 B,\n r1#{}(can spill: false) consumed 20.0 B.", r3.consumer().id(), r1.consumer().id());
let res = downcasted.report_top(2);
assert_eq!(
res, expected,
"should provide list of top memory consumers, instead found {res:?}"
);
assert_snapshot!(res, @r"
r3#[ID](can spill: false) consumed 45.0 B,
r1#[ID](can spill: false) consumed 20.0 B.
");
}
}