Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 0 additions & 150 deletions src/test_utils/insta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,155 +23,5 @@ pub fn settings() -> insta::Settings {
"UUID",
);
settings.add_filter(r"\d+\.\.\d+", "<int>..<int>");

// Since metrics are not deterministic, we replace them with <metric> in test outputs.
// Note taht we leave metrics like `output_rows` since that is deterministic.
settings.add_filter(
r"elapsed_compute=[\d.]+[a-zA-Zµnms]+",
"elapsed_compute=<metric>",
);
settings.add_filter(r"spill_count=\d+", "spill_count=<metric>");
settings.add_filter(
r"spilled_bytes=[\d.]+\s*[KMGTPE]?B?",
"spilled_bytes=<metric>",
);
settings.add_filter(r"spilled_rows=\d+", "spilled_rows=<metric>");
settings.add_filter(
r"current_memory_usage=[\d.]+\s*[KMGTPE]?B?",
"current_memory_usage=<metric>",
);
settings.add_filter(
r"start_timestamp=[\d.]+[a-zA-Zµnms]*",
"start_timestamp=<metric>",
);
settings.add_filter(
r"end_timestamp=[\d.]+[a-zA-Zµnms]*",
"end_timestamp=<metric>",
);

// Common custom metric patterns
settings.add_filter(r"fetch_time=[\d.]+[a-zA-Zµnms]+", "fetch_time=<metric>");
settings.add_filter(
r"repartition_time=[\d.]+[a-zA-Zµnms]+",
"repartition_time=<metric>",
);
settings.add_filter(r"send_time=[\d.]+[a-zA-Zµnms]+", "send_time=<metric>");
settings.add_filter(r"peak_mem_used=\d+", "peak_mem_used=<metric>");
settings.add_filter(r"batches_splitted=\d+", "batches_splitted=<metric>");
settings.add_filter(r"batches_split=\d+", "batches_split=<metric>");
settings.add_filter(r"bytes_scanned=\d+", "bytes_scanned=<metric>");
settings.add_filter(r"file_open_errors=\d+", "file_open_errors=<metric>");
settings.add_filter(r"file_scan_errors=\d+", "file_scan_errors=<metric>");
settings.add_filter(
r"aggregate_arguments_time=[\d.]+[a-zA-Zµnms]+",
"aggregate_arguments_time=<metric>",
);
settings.add_filter(
r"aggregation_time=[\d.]+[a-zA-Zµnms]+",
"aggregation_time=<metric>",
);
settings.add_filter(
r"emitting_time=[\d.]+[a-zA-Zµnms]+",
"emitting_time=<metric>",
);
settings.add_filter(
r"time_calculating_group_ids=[\d.]+[a-zA-Zµnms]+",
"time_calculating_group_ids=<metric>",
);
settings.add_filter(
r"files_ranges_pruned_statistics=\d+",
"files_ranges_pruned_statistics=<metric>",
);
settings.add_filter(
r"num_predicate_creation_errors=\d+",
"num_predicate_creation_errors=<metric>",
);
settings.add_filter(
r"page_index_rows_matched=\d+",
"page_index_rows_matched=<metric>",
);
settings.add_filter(
r"page_index_rows_pruned=\d+",
"page_index_rows_pruned=<metric>",
);
settings.add_filter(
r"predicate_evaluation_errors=\d+",
"predicate_evaluation_errors=<metric>",
);
settings.add_filter(
r"pushdown_rows_matched=\d+",
"pushdown_rows_matched=<metric>",
);
settings.add_filter(r"pushdown_rows_pruned=\d+", "pushdown_rows_pruned=<metric>");
settings.add_filter(
r"row_groups_matched_bloom_filter=\d+",
"row_groups_matched_bloom_filter=<metric>",
);
settings.add_filter(
r"row_groups_matched_statistics=\d+",
"row_groups_matched_statistics=<metric>",
);
settings.add_filter(
r"row_groups_pruned_bloom_filter=\d+",
"row_groups_pruned_bloom_filter=<metric>",
);
settings.add_filter(
r"row_groups_pruned_statistics=\d+",
"row_groups_pruned_statistics=<metric>",
);
settings.add_filter(
r"bloom_filter_eval_time=[\d.]+[a-zA-Zµnms]+",
"bloom_filter_eval_time=<metric>",
);
settings.add_filter(
r"metadata_load_time=[\d.]+[a-zA-Zµnms]+",
"metadata_load_time=<metric>",
);
settings.add_filter(
r"page_index_eval_time=[\d.]+[a-zA-Zµnms]+",
"page_index_eval_time=<metric>",
);
settings.add_filter(
r"row_pushdown_eval_time=[\d.]+[a-zA-Zµnms]+",
"row_pushdown_eval_time=<metric>",
);
settings.add_filter(
r"statistics_eval_time=[\d.]+[a-zA-Zµnms]+",
"statistics_eval_time=<metric>",
);
settings.add_filter(
r"time_elapsed_opening=[\d.]+[a-zA-Zµnms]+",
"time_elapsed_opening=<metric>",
);
settings.add_filter(
r"time_elapsed_processing=[\d.]+[a-zA-Zµnms]+",
"time_elapsed_processing=<metric>",
);
settings.add_filter(
r"time_elapsed_scanning_total=[\d.]+[a-zA-Zµnms]+",
"time_elapsed_scanning_total=<metric>",
);
settings.add_filter(
r"time_elapsed_scanning_until_data=[\d.]+[a-zA-Zµnms]+",
"time_elapsed_scanning_until_data=<metric>",
);
settings.add_filter(
r"skipped_aggregation_rows=\d+",
"skipped_aggregation_rows=<metric>",
);
settings.add_filter(r"build_input_batches=\d+", "build_input_batches=<metric>");
settings.add_filter(r"build_input_rows=\d+", "build_input_rows=<metric>");
settings.add_filter(r"input_batches=\d+", "input_batches=<metric>");
settings.add_filter(r"input_rows=\d+", "input_rows=<metric>");
settings.add_filter(r"output_batches=\d+", "output_batches=<metric>");
settings.add_filter(
r"output_bytes=\d+.\d [(B)|(KB)|(MB)]",
"output_bytes=<metric>",
);
settings.add_filter(r"build_mem_used=\d+", "build_mem_used=<metric>");
settings.add_filter(r"build_time=[\d.]+[a-zA-Zµnms]+", "build_time=<metric>");
settings.add_filter(r"join_time=[\d.]+[a-zA-Zµnms]+", "join_time=<metric>");
settings.add_filter(r"total → \d+ matched", "total → X matched");

settings
}
5 changes: 5 additions & 0 deletions src/test_utils/parquet.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
use datafusion::error::DataFusionError;
use datafusion::prelude::{ParquetReadOptions, SessionContext};

/// Adds two test tables to the provided [SessionContext]:
/// - `flights_1m`: 1M rows of flight data.
/// - `weather`: smaller dataset with weather forecast.
///
/// Useful for testing queries using SQL directly.
pub async fn register_parquet_tables(ctx: &SessionContext) -> Result<(), DataFusionError> {
ctx.register_parquet(
"flights_1m",
Expand Down
196 changes: 196 additions & 0 deletions tests/metrics_collection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
#[cfg(all(feature = "integration", test))]
mod tests {
use datafusion::catalog::memory::DataSourceExec;
use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
use datafusion::physical_plan::{ExecutionPlan, execute_stream};
use datafusion::prelude::SessionContext;
use datafusion_distributed::test_utils::localhost::start_localhost_context;
use datafusion_distributed::test_utils::parquet::register_parquet_tables;
use datafusion_distributed::{
DefaultSessionBuilder, DistributedExec, display_plan_ascii,
rewrite_distributed_plan_with_metrics,
};
use futures::TryStreamExt;
use itertools::Itertools;
use std::sync::Arc;

#[tokio::test]
async fn test_metrics_collection_in_aggregation() -> Result<(), Box<dyn std::error::Error>> {
let (d_ctx, _guard) = start_localhost_context(3, DefaultSessionBuilder).await;

let query =
r#"SELECT count(*), "RainToday" FROM weather GROUP BY "RainToday" ORDER BY count(*)"#;

let s_ctx = SessionContext::default();
let (s_physical, d_physical) = execute(&s_ctx, &d_ctx, query).await?;

assert_metrics_equal::<DataSourceExec>(
["output_rows", "bytes_scanned"],
&s_physical,
&d_physical,
0,
);

Ok(())
}

#[tokio::test]
async fn test_metrics_collection_in_join() -> Result<(), Box<dyn std::error::Error>> {
let (d_ctx, _guard) = start_localhost_context(3, DefaultSessionBuilder).await;

let query = r#"
WITH a AS (
SELECT
AVG("MinTemp") as "MinTemp",
"RainTomorrow"
FROM weather
WHERE "RainToday" = 'yes'
GROUP BY "RainTomorrow"
), b AS (
SELECT
AVG("MaxTemp") as "MaxTemp",
"RainTomorrow"
FROM weather
WHERE "RainToday" = 'no'
GROUP BY "RainTomorrow"
)
SELECT
a."MinTemp",
b."MaxTemp"
FROM a
LEFT JOIN b
ON a."RainTomorrow" = b."RainTomorrow"
"#;

let s_ctx = SessionContext::default();
let (s_physical, d_physical) = execute(&s_ctx, &d_ctx, query).await?;
println!("{}", display_plan_ascii(s_physical.as_ref(), true));
println!("{}", display_plan_ascii(d_physical.as_ref(), true));

for data_source_index in 0..2 {
assert_metrics_equal::<DataSourceExec>(
["output_rows", "bytes_scanned"],
&s_physical,
&d_physical,
data_source_index,
);
}

Ok(())
}

#[tokio::test]
async fn test_metrics_collection_in_union() -> Result<(), Box<dyn std::error::Error>> {
let (d_ctx, _guard) = start_localhost_context(3, DefaultSessionBuilder).await;

let query = r#"
SELECT "MinTemp", "RainToday" FROM weather WHERE "MinTemp" > 10.0
UNION ALL
SELECT "MaxTemp", "RainToday" FROM weather WHERE "MaxTemp" < 30.0
UNION ALL
SELECT "Temp9am", "RainToday" FROM weather WHERE "Temp9am" > 15.0
UNION ALL
SELECT "Temp3pm", "RainToday" FROM weather WHERE "Temp3pm" < 25.0
UNION ALL
SELECT "Rainfall", "RainToday" FROM weather WHERE "Rainfall" > 5.0
"#;

let s_ctx = SessionContext::default();
let (s_physical, d_physical) = execute(&s_ctx, &d_ctx, query).await?;
println!("{}", display_plan_ascii(s_physical.as_ref(), true));
println!("{}", display_plan_ascii(d_physical.as_ref(), true));

for data_source_index in 0..5 {
assert_metrics_equal::<DataSourceExec>(
["output_rows", "bytes_scanned"],
&s_physical,
&d_physical,
data_source_index,
);
}

Ok(())
}

/// Looks for an [ExecutionPlan] that matches the provided type parameter `T` in
/// both root nodes and compares its metrics.
/// There might be more than one, so `index` determines which one is compared.
///
/// If the two root nodes contain a child T with different metrics, the assertion fails.
fn assert_metrics_equal<T: ExecutionPlan + 'static>(
names: impl IntoIterator<Item = &'static str>,
one: &Arc<dyn ExecutionPlan>,
other: &Arc<dyn ExecutionPlan>,
index: usize,
) {
for name in names.into_iter() {
let one_metric = node_metrics::<T>(one, name, index);
let other_metric = node_metrics::<T>(other, name, index);
assert_eq!(one_metric, other_metric);
}
}

async fn execute(
s_ctx: &SessionContext,
d_ctx: &SessionContext,
query: &str,
) -> Result<(Arc<dyn ExecutionPlan>, Arc<dyn ExecutionPlan>), Box<dyn std::error::Error>> {
register_parquet_tables(s_ctx).await?;
register_parquet_tables(d_ctx).await?;

let s_df = s_ctx.sql(query).await?;
let s_physical = s_df.create_physical_plan().await?;
execute_stream(s_physical.clone(), s_ctx.task_ctx())?
.try_collect::<Vec<_>>()
.await?;

let d_df = d_ctx.sql(query).await?;
let d_physical = d_df.create_physical_plan().await?;
execute_stream(d_physical.clone(), d_ctx.task_ctx())?
.try_collect::<Vec<_>>()
.await?;
let d_physical = rewrite_distributed_plan_with_metrics(d_physical.clone())?;

Ok((s_physical, d_physical))
}

fn node_metrics<T: ExecutionPlan + 'static>(
plan: &Arc<dyn ExecutionPlan>,
metric_name: &str,
mut index: usize,
) -> usize {
let mut metrics = None;
plan.clone()
.transform_down(|plan| {
if plan.name() == T::static_name() {
metrics = plan.metrics();
if index == 0 {
return Ok(Transformed::new(plan, false, TreeNodeRecursion::Stop));
}
index -= 1;
}
Ok(Transformed::no(plan))
})
.unwrap();
let metrics = metrics
.unwrap_or_else(|| panic!("Could not find metrics for plan {}", T::static_name()));
let is_distributed = plan.as_any().is::<DistributedExec>();
metrics
.iter()
.find(|v| v.value().name() == metric_name)
.unwrap_or_else(|| {
panic!(
"{} Could not find metric '{metric_name}' in {}. Available metrics are: {:?}",
if is_distributed {
"(distributed)"
} else {
"(single node)"
},
T::static_name(),
metrics.iter().map(|v| v.value().name()).collect_vec()
)
})
.value()
.as_usize()
}
}