diff --git a/Cargo.lock b/Cargo.lock index 37d67fff9ce2..93dd9e885944 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2190,6 +2190,7 @@ dependencies = [ "datafusion-common", "datafusion-expr", "futures", + "insta", "log", "object_store", "parking_lot", diff --git a/datafusion/execution/Cargo.toml b/datafusion/execution/Cargo.toml index 20e507e98b68..5988d3a33660 100644 --- a/datafusion/execution/Cargo.toml +++ b/datafusion/execution/Cargo.toml @@ -52,3 +52,4 @@ url = { workspace = true } [dev-dependencies] chrono = { workspace = true } +insta = { workspace = true } diff --git a/datafusion/execution/src/memory_pool/pool.rs b/datafusion/execution/src/memory_pool/pool.rs index 32ad3a586390..11467f69be1c 100644 --- a/datafusion/execution/src/memory_pool/pool.rs +++ b/datafusion/execution/src/memory_pool/pool.rs @@ -443,8 +443,18 @@ fn provide_top_memory_consumers_to_error_msg( #[cfg(test)] mod tests { use super::*; + use insta::{allow_duplicates, assert_snapshot, Settings}; use std::sync::Arc; + fn make_settings() -> Settings { + let mut settings = Settings::clone_current(); + settings.add_filter( + r"([^\s]+)\#\d+\(can spill: (true|false)\)", + "$1#[ID](can spill: $2)", + ); + settings + } + #[test] fn test_fair() { let pool = Arc::new(FairSpillPool::new(100)) as _; @@ -463,10 +473,10 @@ mod tests { assert_eq!(pool.reserved(), 4000); let err = r2.try_grow(1).unwrap_err().strip_backtrace(); - assert_eq!(err, "Resources exhausted: Failed to allocate additional 1.0 B for r2 with 2000.0 B already allocated for this reservation - 0.0 B remain available for the total pool"); + assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 1.0 B for r2 with 2000.0 B already allocated for this reservation - 0.0 B remain available for the total pool"); let err = r2.try_grow(1).unwrap_err().strip_backtrace(); - assert_eq!(err, "Resources exhausted: Failed to allocate additional 1.0 B for r2 with 2000.0 B already allocated for this reservation - 0.0 B remain available for the total pool"); + assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 1.0 B for r2 with 2000.0 B already allocated for this reservation - 0.0 B remain available for the total pool"); r1.shrink(1990); r2.shrink(2000); @@ -491,12 +501,12 @@ mod tests { .register(&pool); let err = r3.try_grow(70).unwrap_err().strip_backtrace(); - assert_eq!(err, "Resources exhausted: Failed to allocate additional 70.0 B for r3 with 0.0 B already allocated for this reservation - 40.0 B remain available for the total pool"); + assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 70.0 B for r3 with 0.0 B already allocated for this reservation - 40.0 B remain available for the total pool"); //Shrinking r2 to zero doesn't allow a3 to allocate more than 45 r2.free(); let err = r3.try_grow(70).unwrap_err().strip_backtrace(); - assert_eq!(err, "Resources exhausted: Failed to allocate additional 70.0 B for r3 with 0.0 B already allocated for this reservation - 40.0 B remain available for the total pool"); + assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 70.0 B for r3 with 0.0 B already allocated for this reservation - 40.0 B remain available for the total pool"); // But dropping r2 does drop(r2); @@ -509,11 +519,13 @@ mod tests { let mut r4 = MemoryConsumer::new("s4").register(&pool); let err = r4.try_grow(30).unwrap_err().strip_backtrace(); - assert_eq!(err, "Resources exhausted: Failed to allocate additional 30.0 B for s4 with 0.0 B already allocated for this reservation - 20.0 B remain available for the total pool"); + assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 30.0 B for s4 with 0.0 B already allocated for this reservation - 20.0 B remain available for the total pool"); } #[test] fn test_tracked_consumers_pool() { + let setting = make_settings(); + let _bound = setting.bind_to_scope(); let pool: Arc = Arc::new(TrackConsumersPool::new( GreedyMemoryPool::new(100), NonZeroUsize::new(3).unwrap(), @@ -546,19 +558,22 @@ mod tests { // Test: reports if new reservation causes error // using the previously set sizes for other consumers let mut r5 = MemoryConsumer::new("r5").register(&pool); - let expected = format!("Additional allocation failed with top memory consumers (across reservations) as:\n r1#{}(can spill: false) consumed 50.0 B,\n r3#{}(can spill: false) consumed 20.0 B,\n r2#{}(can spill: false) consumed 15.0 B.\nError: Failed to allocate additional 150.0 B for r5 with 0.0 B already allocated for this reservation - 5.0 B remain available for the total pool", r1.consumer().id(), r3.consumer().id(), r2.consumer().id()); let res = r5.try_grow(150); - assert!( - matches!( - &res, - Err(DataFusionError::ResourcesExhausted(ref e)) if e.to_string().contains(&expected) - ), - "should provide list of top memory consumers, instead found {res:?}" - ); + assert!(res.is_err()); + let error = res.unwrap_err().strip_backtrace(); + assert_snapshot!(error, @r" + Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: + r1#[ID](can spill: false) consumed 50.0 B, + r3#[ID](can spill: false) consumed 20.0 B, + r2#[ID](can spill: false) consumed 15.0 B. + Error: Failed to allocate additional 150.0 B for r5 with 0.0 B already allocated for this reservation - 5.0 B remain available for the total pool + "); } #[test] fn test_tracked_consumers_pool_register() { + let setting = make_settings(); + let _bound = setting.bind_to_scope(); let pool: Arc = Arc::new(TrackConsumersPool::new( GreedyMemoryPool::new(100), NonZeroUsize::new(3).unwrap(), @@ -568,15 +583,14 @@ mod tests { // Test: see error message when no consumers recorded yet let mut r0 = MemoryConsumer::new(same_name).register(&pool); - let expected = format!("Additional allocation failed with top memory consumers (across reservations) as:\n foo#{}(can spill: false) consumed 0.0 B.\nError: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 100.0 B remain available for the total pool", r0.consumer().id()); let res = r0.try_grow(150); - assert!( - matches!( - &res, - Err(DataFusionError::ResourcesExhausted(ref e)) if e.to_string().contains(&expected) - ), - "should provide proper error when no reservations have been made yet, instead found {res:?}" - ); + assert!(res.is_err()); + let error = res.unwrap_err().strip_backtrace(); + assert_snapshot!(error, @r" + Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: + foo#[ID](can spill: false) consumed 0.0 B. + Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 100.0 B remain available for the total pool + "); // API: multiple registrations using the same hashed consumer, // will be recognized *differently* in the TrackConsumersPool. @@ -586,100 +600,101 @@ mod tests { let mut r1 = new_consumer_same_name.register(&pool); // TODO: the insufficient_capacity_err() message is per reservation, not per consumer. // a followup PR will clarify this message "0 bytes already allocated for this reservation" - let expected = format!("Additional allocation failed with top memory consumers (across reservations) as:\n foo#{}(can spill: false) consumed 10.0 B,\n foo#{}(can spill: false) consumed 0.0 B.\nError: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 90.0 B remain available for the total pool", r0.consumer().id(), r1.consumer().id()); let res = r1.try_grow(150); - assert!( - matches!( - &res, - Err(DataFusionError::ResourcesExhausted(ref e)) if e.to_string().contains(&expected) - ), - "should provide proper error for 2 consumers, instead found {res:?}" - ); + assert!(res.is_err()); + let error = res.unwrap_err().strip_backtrace(); + assert_snapshot!(error, @r" + Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: + foo#[ID](can spill: false) consumed 10.0 B, + foo#[ID](can spill: false) consumed 0.0 B. + Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 90.0 B remain available for the total pool + "); // Test: will accumulate size changes per consumer, not per reservation r1.grow(20); - let expected = format!("Additional allocation failed with top memory consumers (across reservations) as:\n foo#{}(can spill: false) consumed 20.0 B,\n foo#{}(can spill: false) consumed 10.0 B.\nError: Failed to allocate additional 150.0 B for foo with 20.0 B already allocated for this reservation - 70.0 B remain available for the total pool", r1.consumer().id(), r0.consumer().id()); + let res = r1.try_grow(150); - assert!( - matches!( - &res, - Err(DataFusionError::ResourcesExhausted(ref e)) if e.to_string().contains(&expected) - ), - "should provide proper error for 2 consumers(one foo=20.0 B, another foo=10.0 B, available=70.0 B), instead found {res:?}" - ); + assert!(res.is_err()); + let error = res.unwrap_err().strip_backtrace(); + assert_snapshot!(error, @r" + Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: + foo#[ID](can spill: false) consumed 20.0 B, + foo#[ID](can spill: false) consumed 10.0 B. + Error: Failed to allocate additional 150.0 B for foo with 20.0 B already allocated for this reservation - 70.0 B remain available for the total pool + "); // Test: different hashed consumer, (even with the same name), // will be recognized as different in the TrackConsumersPool let consumer_with_same_name_but_different_hash = MemoryConsumer::new(same_name).with_can_spill(true); let mut r2 = consumer_with_same_name_but_different_hash.register(&pool); - let expected = format!("Additional allocation failed with top memory consumers (across reservations) as:\n foo#{}(can spill: false) consumed 20.0 B,\n foo#{}(can spill: false) consumed 10.0 B,\n foo#{}(can spill: true) consumed 0.0 B.\nError: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 70.0 B remain available for the total pool", r1.consumer().id(), r0.consumer().id(), r2.consumer().id()); let res = r2.try_grow(150); - assert!( - matches!( - &res, - Err(DataFusionError::ResourcesExhausted(ref e)) if e.to_string().contains(&expected) - ), - "should provide proper error with 3 separate consumers(1 = 20 bytes, 2 = 10 bytes, 3 = 0 bytes), instead found {res:?}" - ); + assert!(res.is_err()); + let error = res.unwrap_err().strip_backtrace(); + assert_snapshot!(error, @r" + Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: + foo#[ID](can spill: false) consumed 20.0 B, + foo#[ID](can spill: false) consumed 10.0 B, + foo#[ID](can spill: true) consumed 0.0 B. + Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 70.0 B remain available for the total pool + "); } #[test] fn test_tracked_consumers_pool_deregister() { fn test_per_pool_type(pool: Arc) { // Baseline: see the 2 memory consumers + let setting = make_settings(); + let _bound = setting.bind_to_scope(); let mut r0 = MemoryConsumer::new("r0").register(&pool); r0.grow(10); let r1_consumer = MemoryConsumer::new("r1"); let mut r1 = r1_consumer.register(&pool); r1.grow(20); - let expected = format!("Additional allocation failed with top memory consumers (across reservations) as:\n r1#{}(can spill: false) consumed 20.0 B,\n r0#{}(can spill: false) consumed 10.0 B.\nError: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 70.0 B remain available for the total pool", r1.consumer().id(), r0.consumer().id()); let res = r0.try_grow(150); - assert!( - matches!( - &res, - Err(DataFusionError::ResourcesExhausted(ref e)) if e.to_string().contains(&expected) - ), - "should provide proper error with both consumers, instead found {res:?}" - ); + assert!(res.is_err()); + let error = res.unwrap_err().strip_backtrace(); + allow_duplicates!(assert_snapshot!(error, @r" + Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: + r1#[ID](can spill: false) consumed 20.0 B, + r0#[ID](can spill: false) consumed 10.0 B. + Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 70.0 B remain available for the total pool + ")); // Test: unregister one // only the remaining one should be listed drop(r1); - let expected_consumers = format!("Additional allocation failed with top memory consumers (across reservations) as:\n r0#{}(can spill: false) consumed 10.0 B", r0.consumer().id()); let res = r0.try_grow(150); - assert!( - matches!( - &res, - Err(DataFusionError::ResourcesExhausted(ref e)) if e.to_string().contains(&expected_consumers) - ), - "should provide proper error with only 1 consumer left registered, instead found {res:?}" - ); + assert!(res.is_err()); + let error = res.unwrap_err().strip_backtrace(); + allow_duplicates!(assert_snapshot!(error, @r" + Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: + r0#[ID](can spill: false) consumed 10.0 B. + Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total pool + ")); // Test: actual message we see is the `available is 70`. When it should be `available is 90`. // This is because the pool.shrink() does not automatically occur within the inner_pool.deregister(). - let expected_90_available = "Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total pool"; let res = r0.try_grow(150); - assert!( - matches!( - &res, - Err(DataFusionError::ResourcesExhausted(ref e)) if e.to_string().contains(expected_90_available) - ), - "should find that the inner pool will still count all bytes for the deregistered consumer until the reservation is dropped, instead found {res:?}" - ); + assert!(res.is_err()); + let error = res.unwrap_err().strip_backtrace(); + allow_duplicates!(assert_snapshot!(error, @r" + Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: + r0#[ID](can spill: false) consumed 10.0 B. + Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total pool + ")); // Test: the registration needs to free itself (or be dropped), // for the proper error message - let expected_90_available = "Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total pool"; let res = r0.try_grow(150); - assert!( - matches!( - &res, - Err(DataFusionError::ResourcesExhausted(ref e)) if e.to_string().contains(expected_90_available) - ), - "should correctly account the total bytes after reservation is free, instead found {res:?}" - ); + assert!(res.is_err()); + let error = res.unwrap_err().strip_backtrace(); + allow_duplicates!(assert_snapshot!(error, @r" + Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: + r0#[ID](can spill: false) consumed 10.0 B. + Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total pool + ")); } let tracked_spill_pool: Arc = Arc::new(TrackConsumersPool::new( @@ -697,6 +712,8 @@ mod tests { #[test] fn test_tracked_consumers_pool_use_beyond_errors() { + let setting = make_settings(); + let _bound = setting.bind_to_scope(); let upcasted: Arc = Arc::new(TrackConsumersPool::new( GreedyMemoryPool::new(100), @@ -720,11 +737,10 @@ mod tests { .unwrap(); // Test: can get runtime metrics, even without an error thrown - let expected = format!(" r3#{}(can spill: false) consumed 45.0 B,\n r1#{}(can spill: false) consumed 20.0 B.", r3.consumer().id(), r1.consumer().id()); let res = downcasted.report_top(2); - assert_eq!( - res, expected, - "should provide list of top memory consumers, instead found {res:?}" - ); + assert_snapshot!(res, @r" + r3#[ID](can spill: false) consumed 45.0 B, + r1#[ID](can spill: false) consumed 20.0 B. + "); } }