From cc0873d35253083b78920dab1c81f78b6da71e71 Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Sat, 22 Feb 2025 23:47:04 +0800 Subject: [PATCH 1/7] Inital version for streaming batch preview --- datafusion-cli/src/exec.rs | 203 ++++++++++++++++++++++++++-- datafusion-cli/src/print_options.rs | 2 +- 2 files changed, 190 insertions(+), 15 deletions(-) diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs index d560dee987f5..e2399dcfb80a 100644 --- a/datafusion-cli/src/exec.rs +++ b/datafusion-cli/src/exec.rs @@ -31,7 +31,10 @@ use std::collections::HashMap; use std::fs::File; use std::io::prelude::*; use std::io::BufReader; - +use arrow::array::RecordBatch; +use arrow::datatypes::SchemaRef; +use arrow::error::ArrowError; +use arrow::util::display::{ArrayFormatter, ValueFormatter}; use datafusion::common::instant::Instant; use datafusion::common::{plan_datafusion_err, plan_err}; use datafusion::config::ConfigFileType; @@ -49,6 +52,7 @@ use datafusion::sql::sqlparser; use rustyline::error::ReadlineError; use rustyline::Editor; use tokio::signal; +use datafusion::common::format::DEFAULT_CLI_FORMAT_OPTIONS; /// run and execute SQL statements and commands, against a context with the given print options pub async fn exec_from_commands( @@ -256,27 +260,83 @@ pub(super) async fn exec_and_print( // Bounded stream; collected results size is limited by the maxrows option let schema = physical_plan.schema(); let mut stream = execute_stream(physical_plan, task_ctx.clone())?; - let mut results = vec![]; - let mut row_count = 0_usize; let max_rows = match print_options.maxrows { MaxRows::Unlimited => usize::MAX, MaxRows::Limited(n) => n, }; + + let preview_limit: usize = 1000; + let mut preview_batches: Vec = vec![]; + let mut preview_row_count = 0_usize; + let mut total_printed = 0_usize; + let mut precomputed_widths: Option> = None; + let mut header_printed = false; + + let stdout = std::io::stdout(); + let mut writer = stdout.lock(); + + // 循环处理流中每个 batch while let Some(batch) = stream.next().await { let batch = batch?; - let curr_num_rows = batch.num_rows(); - // Stop collecting results if the number of rows exceeds the limit - // results batch should include the last batch that exceeds the limit - if row_count < max_rows + curr_num_rows { - // Try to grow the reservation to accommodate the batch in memory - reservation.try_grow(get_record_batch_memory_size(&batch))?; - results.push(batch); + let batch_rows = batch.num_rows(); + + // 判断是否超出最大行数限制 + if total_printed + batch_rows > max_rows { + // 只处理需要的部分行 + let needed = max_rows - total_printed; + let batch = batch.slice(0, needed); + process_batch( + &batch, + schema.clone(), + &mut preview_batches, + &mut preview_row_count, + preview_limit, + &mut precomputed_widths, + &mut header_printed, + &mut writer, + )?; + total_printed += needed; + break; + } else { + process_batch( + &batch, + schema.clone(), + &mut preview_batches, + &mut preview_row_count, + preview_limit, + &mut precomputed_widths, + &mut header_printed, + &mut writer, + )?; + total_printed += batch_rows; } - row_count += curr_num_rows; } - adjusted - .into_inner() - .print_batches(schema, &results, now, row_count)?; + + // 若仍未达到预览阈值,则延后计算并打印(例如总行数非常少) + if precomputed_widths.is_none() && !preview_batches.is_empty() { + let widths = compute_column_widths(&preview_batches, schema.clone())?; + precomputed_widths = Some(widths.clone()); + print_header(&schema, &widths, &mut writer)?; + header_printed = true; + for batch in preview_batches.iter() { + print_batch_with_widths(batch, &widths, &mut writer)?; + } + } + + // // 打印执行详情(可选) + // let formatted_exec_details = get_execution_details_formatted( + // total_printed, + // if print_options.format == PrintFormat::Table { + // print_options.maxrows + // } else { + // MaxRows::Unlimited + // }, + // Instant::now(), // 或传入查询开始时间 + // ); + + // if !print_options.quiet { + // writeln!(writer, "{}", formatted_exec_details)?; + // } reservation.free(); } } @@ -284,6 +344,121 @@ pub(super) async fn exec_and_print( Ok(()) } +fn process_batch( + batch: &RecordBatch, + schema: SchemaRef, + preview_batches: &mut Vec, + preview_row_count: &mut usize, + preview_limit: usize, + precomputed_widths: &mut Option>, + header_printed: &mut bool, + writer: &mut impl std::io::Write, +) -> Result<()> { + if precomputed_widths.is_none() { + // 仍处于预览阶段 + preview_batches.push(batch.clone()); + *preview_row_count += batch.num_rows(); + if *preview_row_count >= preview_limit { + // 达到预览阈值,计算列宽 + let widths = compute_column_widths(preview_batches, schema.clone())?; + *precomputed_widths = Some(widths.clone()); + // 打印表头 + print_header(&schema, &widths, writer)?; + *header_printed = true; + // 打印预览阶段缓存的所有 batch + for preview_batch in preview_batches.drain(..) { + print_batch_with_widths(&preview_batch, &widths, writer)?; + } + } + } else { + // 预览阶段结束,直接流式打印 + let widths = precomputed_widths.as_ref().unwrap(); + if !*header_printed { + print_header(&schema, widths, writer)?; + *header_printed = true; + } + print_batch_with_widths(batch, widths, writer)?; + } + Ok(()) +} + +/// 根据预览批次计算各列的最大宽度(考虑表头和各行值的长度) +fn compute_column_widths(batches: &Vec, schema: SchemaRef) -> Result> { + // 以表头作为初始宽度 + let mut widths: Vec = schema + .fields() + .iter() + .map(|f| f.name().len()) + .collect(); + // 遍历所有 batch 计算各列宽度 + for batch in batches { + let formatters = batch + .columns() + .iter() + .map(|c| ArrayFormatter::try_new(c.as_ref(), &DEFAULT_CLI_FORMAT_OPTIONS)) + .collect::, ArrowError>>()?; + for row in 0..batch.num_rows() { + for (i, formatter) in formatters.iter().enumerate() { + let cell = formatter.value(row); + widths[i] = widths[i].max(cell.to_string().len()); + } + } + } + Ok(widths) +} + +/// 打印表头,并使用分隔符 +fn print_header( + schema: &SchemaRef, + widths: &Vec, + writer: &mut impl std::io::Write, +) -> Result<()> { + let header: Vec = schema + .fields() + .iter() + .enumerate() + .map(|(i, field)| pad_cell(field.name(), widths[i])) + .collect(); + writeln!(writer, "{}", header.join(" | "))?; + // 打印分隔行 + let separator: Vec = widths.iter().map(|w| "-".repeat(*w)).collect(); + writeln!(writer, "{}", separator.join("-+-"))?; + Ok(()) +} + +/// 根据预计算的列宽打印 batch 中的所有行 +fn print_batch_with_widths( + batch: &RecordBatch, + widths: &Vec, + writer: &mut impl std::io::Write, +) -> Result<()> { + let formatters = batch + .columns() + .iter() + .map(|c| ArrayFormatter::try_new(c.as_ref(), &DEFAULT_CLI_FORMAT_OPTIONS)) + .collect::, ArrowError>>()?; + for row in 0..batch.num_rows() { + let cells: Vec = formatters + .iter() + .enumerate() + .map(|(i, formatter)| pad_value(&formatter.value(row), widths[i])) + .collect(); + writeln!(writer, "{}", cells.join(" | "))?; + } + Ok(()) +} + +/// 为单元格内容补齐空格(左对齐) +fn pad_cell(cell: &str, width: usize) -> String { + format!("{: String { + let s = formatter.try_to_string().unwrap_or_default(); + format!("{: Date: Tue, 25 Feb 2025 00:53:41 +0800 Subject: [PATCH 2/7] Add more reasonable code --- datafusion-cli/src/exec.rs | 224 +++++++++++++++--------- datafusion-cli/src/print_options.rs | 47 ++--- datafusion-cli/tests/cli_integration.rs | 1 + 3 files changed, 164 insertions(+), 108 deletions(-) diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs index e2399dcfb80a..85d05213936a 100644 --- a/datafusion-cli/src/exec.rs +++ b/datafusion-cli/src/exec.rs @@ -26,11 +26,6 @@ use crate::{ object_storage::get_object_store, print_options::{MaxRows, PrintOptions}, }; -use futures::StreamExt; -use std::collections::HashMap; -use std::fs::File; -use std::io::prelude::*; -use std::io::BufReader; use arrow::array::RecordBatch; use arrow::datatypes::SchemaRef; use arrow::error::ArrowError; @@ -45,14 +40,19 @@ use datafusion::physical_plan::execution_plan::EmissionType; use datafusion::physical_plan::{execute_stream, ExecutionPlanProperties}; use datafusion::sql::parser::{DFParser, Statement}; use datafusion::sql::sqlparser::dialect::dialect_from_str; +use futures::StreamExt; +use std::collections::HashMap; +use std::fs::File; +use std::io::prelude::*; +use std::io::BufReader; +use datafusion::common::format::DEFAULT_CLI_FORMAT_OPTIONS; use datafusion::execution::memory_pool::MemoryConsumer; use datafusion::physical_plan::spill::get_record_batch_memory_size; use datafusion::sql::sqlparser; use rustyline::error::ReadlineError; use rustyline::Editor; use tokio::signal; -use datafusion::common::format::DEFAULT_CLI_FORMAT_OPTIONS; /// run and execute SQL statements and commands, against a context with the given print options pub async fn exec_from_commands( @@ -232,20 +232,18 @@ pub(super) async fn exec_and_print( let statements = DFParser::parse_sql_with_dialect(&sql, dialect.as_ref())?; for statement in statements { - let adjusted = - AdjustedPrintOptions::new(print_options.clone()).with_statement(&statement); - let plan = create_plan(ctx, statement).await?; - let adjusted = adjusted.with_plan(&plan); let df = ctx.execute_logical_plan(plan).await?; let physical_plan = df.create_physical_plan().await?; // Track memory usage for the query result if it's bounded - let mut reservation = - MemoryConsumer::new("DataFusion-Cli").register(task_ctx.memory_pool()); + let mut reservation = MemoryConsumer::new("DataFusion-Cli") + .with_can_spill(false) + .register(task_ctx.memory_pool()); - if physical_plan.boundedness().is_unbounded() { + // Both bounded and unbounded streams are streaming prints + if print_options.format != PrintFormat::Table { if physical_plan.pipeline_behavior() == EmissionType::Final { return plan_err!( "The given query can generate a valid result only once \ @@ -268,75 +266,101 @@ pub(super) async fn exec_and_print( let preview_limit: usize = 1000; let mut preview_batches: Vec = vec![]; let mut preview_row_count = 0_usize; - let mut total_printed = 0_usize; + let mut total_count = 0_usize; let mut precomputed_widths: Option> = None; let mut header_printed = false; + let mut max_rows_reached = false; let stdout = std::io::stdout(); let mut writer = stdout.lock(); - // 循环处理流中每个 batch while let Some(batch) = stream.next().await { let batch = batch?; let batch_rows = batch.num_rows(); - // 判断是否超出最大行数限制 - if total_printed + batch_rows > max_rows { - // 只处理需要的部分行 - let needed = max_rows - total_printed; - let batch = batch.slice(0, needed); - process_batch( - &batch, - schema.clone(), - &mut preview_batches, - &mut preview_row_count, - preview_limit, - &mut precomputed_widths, - &mut header_printed, - &mut writer, - )?; - total_printed += needed; - break; - } else { - process_batch( - &batch, - schema.clone(), - &mut preview_batches, - &mut preview_row_count, - preview_limit, - &mut precomputed_widths, - &mut header_printed, - &mut writer, - )?; - total_printed += batch_rows; + if !max_rows_reached { + if total_count < max_rows { + if total_count + batch_rows > max_rows { + let needed = max_rows - total_count; + let batch_to_print = batch.slice(0, needed); + process_batch( + &batch_to_print, + schema.clone(), + &mut preview_batches, + &mut preview_row_count, + preview_limit, + &mut precomputed_widths, + &mut header_printed, + &mut writer, + )?; + if precomputed_widths.is_none() { + let widths = compute_column_widths( + &preview_batches, + schema.clone(), + )?; + precomputed_widths = Some(widths.clone()); + if !header_printed { + print_header(&schema, &widths, &mut writer)?; + header_printed = true; + } + for preview_batch in preview_batches.drain(..) { + print_batch_with_widths( + &preview_batch, + &widths, + &mut writer, + )?; + } + } + if let Some(ref widths) = precomputed_widths { + for _ in 0..3 { + print_dotted_line(widths, &mut writer)?; + } + print_bottom_border(widths, &mut writer)?; + } + max_rows_reached = true; + } else { + process_batch( + &batch, + schema.clone(), + &mut preview_batches, + &mut preview_row_count, + preview_limit, + &mut precomputed_widths, + &mut header_printed, + &mut writer, + )?; + } + } } + total_count += batch_rows; } - // 若仍未达到预览阈值,则延后计算并打印(例如总行数非常少) - if precomputed_widths.is_none() && !preview_batches.is_empty() { - let widths = compute_column_widths(&preview_batches, schema.clone())?; - precomputed_widths = Some(widths.clone()); - print_header(&schema, &widths, &mut writer)?; - header_printed = true; - for batch in preview_batches.iter() { - print_batch_with_widths(batch, &widths, &mut writer)?; + if !max_rows_reached { + if precomputed_widths.is_none() && !preview_batches.is_empty() { + let widths = compute_column_widths(&preview_batches, schema.clone())?; + precomputed_widths = Some(widths); + if !header_printed { + print_header( + &schema, + precomputed_widths.as_ref().unwrap(), + &mut writer, + )?; + header_printed = true; + } + } + if let Some(ref widths) = precomputed_widths { + print_bottom_border(widths, &mut writer)?; } } - // // 打印执行详情(可选) - // let formatted_exec_details = get_execution_details_formatted( - // total_printed, - // if print_options.format == PrintFormat::Table { - // print_options.maxrows - // } else { - // MaxRows::Unlimited - // }, - // Instant::now(), // 或传入查询开始时间 - // ); - - // if !print_options.quiet { - // writeln!(writer, "{}", formatted_exec_details)?; - // } + let formatted_exec_details = print_options.get_execution_details_formatted( + total_count, + print_options.maxrows, + now, + ); + if !print_options.quiet { + writeln!(writer, "{}", formatted_exec_details)?; + } reservation.free(); } } @@ -355,23 +379,18 @@ fn process_batch( writer: &mut impl std::io::Write, ) -> Result<()> { if precomputed_widths.is_none() { - // 仍处于预览阶段 preview_batches.push(batch.clone()); *preview_row_count += batch.num_rows(); if *preview_row_count >= preview_limit { - // 达到预览阈值,计算列宽 let widths = compute_column_widths(preview_batches, schema.clone())?; *precomputed_widths = Some(widths.clone()); - // 打印表头 print_header(&schema, &widths, writer)?; *header_printed = true; - // 打印预览阶段缓存的所有 batch for preview_batch in preview_batches.drain(..) { print_batch_with_widths(&preview_batch, &widths, writer)?; } } } else { - // 预览阶段结束,直接流式打印 let widths = precomputed_widths.as_ref().unwrap(); if !*header_printed { print_header(&schema, widths, writer)?; @@ -382,14 +401,13 @@ fn process_batch( Ok(()) } -/// 根据预览批次计算各列的最大宽度(考虑表头和各行值的长度) -fn compute_column_widths(batches: &Vec, schema: SchemaRef) -> Result> { +/// Compute the maximum width of each column in the given batches +fn compute_column_widths( + batches: &Vec, + schema: SchemaRef, +) -> Result> { // 以表头作为初始宽度 - let mut widths: Vec = schema - .fields() - .iter() - .map(|f| f.name().len()) - .collect(); + let mut widths: Vec = schema.fields().iter().map(|f| f.name().len()).collect(); // 遍历所有 batch 计算各列宽度 for batch in batches { let formatters = batch @@ -407,26 +425,33 @@ fn compute_column_widths(batches: &Vec, schema: SchemaRef) -> Resul Ok(widths) } -/// 打印表头,并使用分隔符 +/// Print the header of a table fn print_header( schema: &SchemaRef, widths: &Vec, writer: &mut impl std::io::Write, ) -> Result<()> { + print_border(widths, writer)?; + let header: Vec = schema .fields() .iter() .enumerate() .map(|(i, field)| pad_cell(field.name(), widths[i])) .collect(); - writeln!(writer, "{}", header.join(" | "))?; - // 打印分隔行 - let separator: Vec = widths.iter().map(|w| "-".repeat(*w)).collect(); - writeln!(writer, "{}", separator.join("-+-"))?; + writeln!(writer, "| {} |", header.join(" | "))?; + + print_border(widths, writer)?; Ok(()) } -/// 根据预计算的列宽打印 batch 中的所有行 +fn print_border(widths: &Vec, writer: &mut impl std::io::Write) -> Result<()> { + let cells: Vec = widths.iter().map(|&w| "-".repeat(w + 2)).collect(); + writeln!(writer, "+{}+", cells.join("+"))?; + Ok(()) +} + +/// Print a batch of records with the given widths for each column fn print_batch_with_widths( batch: &RecordBatch, widths: &Vec, @@ -443,7 +468,8 @@ fn print_batch_with_widths( .enumerate() .map(|(i, formatter)| pad_value(&formatter.value(row), widths[i])) .collect(); - writeln!(writer, "{}", cells.join(" | "))?; + // 修改这里:在前后各加一个 "|" 以保持和 header 格式一致 + writeln!(writer, "| {} |", cells.join(" | "))?; } Ok(()) } @@ -458,6 +484,34 @@ fn pad_value(formatter: &ValueFormatter, width: usize) -> String { format!("{:, + writer: &mut impl std::io::Write, +) -> Result<()> { + // 构造每个单元格,点号左对齐,长度与对应宽度相同 + let cells: Vec = widths + .iter() + .map(|&w| format!(" {: , + writer: &mut impl std::io::Write, +) -> Result<()> { + // 构造每个单元格对应的边框部分,例如 "+------------+" + let cells: Vec = widths + .iter() + .map(|&w| "-".repeat(w + 2)) // 加2可以对齐左右两侧的空格 + .collect(); + writeln!(writer, "+{}+", cells.join("+"))?; + Ok(()) +} /// Track adjustments to the print options based on the plan / statement being executed #[derive(Debug)] diff --git a/datafusion-cli/src/print_options.rs b/datafusion-cli/src/print_options.rs index 3cb008e5c9c1..89ce260270e8 100644 --- a/datafusion-cli/src/print_options.rs +++ b/datafusion-cli/src/print_options.rs @@ -74,27 +74,6 @@ pub struct PrintOptions { pub color: bool, } -// Returns the query execution details formatted -pub fn get_execution_details_formatted( - row_count: usize, - maxrows: MaxRows, - query_start_time: Instant, -) -> String { - let nrows_shown_msg = match maxrows { - MaxRows::Limited(nrows) if nrows < row_count => { - format!("(First {nrows} displayed. Use --maxrows to adjust)") - } - _ => String::new(), - }; - - format!( - "{} row(s) fetched. {}\nElapsed {:.3} seconds.\n", - row_count, - nrows_shown_msg, - query_start_time.elapsed().as_secs_f64() - ) -} - impl PrintOptions { /// Print the batches to stdout using the specified format pub fn print_batches( @@ -110,7 +89,7 @@ impl PrintOptions { self.format .print_batches(&mut writer, schema, batches, self.maxrows, true)?; - let formatted_exec_details = get_execution_details_formatted( + let formatted_exec_details = self.get_execution_details_formatted( row_count, if self.format == PrintFormat::Table { self.maxrows @@ -158,7 +137,7 @@ impl PrintOptions { with_header = false; } - let formatted_exec_details = get_execution_details_formatted( + let formatted_exec_details = self.get_execution_details_formatted( row_count, MaxRows::Unlimited, query_start_time, @@ -170,4 +149,26 @@ impl PrintOptions { Ok(()) } + + // Returns the query execution details formatted + pub fn get_execution_details_formatted( + &self, + row_count: usize, + maxrows: MaxRows, + query_start_time: Instant, + ) -> String { + let nrows_shown_msg = match maxrows { + MaxRows::Limited(nrows) if nrows < row_count => { + format!("(First {nrows} displayed. Use --maxrows to adjust)") + } + _ => String::new(), + }; + + format!( + "{} row(s) fetched. {}\nElapsed {:.3} seconds.\n", + row_count, + nrows_shown_msg, + query_start_time.elapsed().as_secs_f64() + ) + } } diff --git a/datafusion-cli/tests/cli_integration.rs b/datafusion-cli/tests/cli_integration.rs index 27cabf15afec..0c6a157142e9 100644 --- a/datafusion-cli/tests/cli_integration.rs +++ b/datafusion-cli/tests/cli_integration.rs @@ -54,5 +54,6 @@ fn cli_quick_test<'a>( ) { let mut cmd = Command::cargo_bin("datafusion-cli").unwrap(); cmd.args(args); + println!("Self output {:?}", cmd.output()); cmd.assert().stdout(predicate::eq(expected)); } From 25a7c06fa996bf2f24fb25effdf6df7821631e23 Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Tue, 25 Feb 2025 16:17:28 +0800 Subject: [PATCH 3/7] polish code --- datafusion-cli/src/exec.rs | 290 ++++------------------------ datafusion-cli/src/print_format.rs | 148 ++++++++++++++ datafusion-cli/src/print_options.rs | 143 ++++++++++++-- 3 files changed, 317 insertions(+), 264 deletions(-) diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs index 85d05213936a..6ed69c5bfe02 100644 --- a/datafusion-cli/src/exec.rs +++ b/datafusion-cli/src/exec.rs @@ -30,28 +30,28 @@ use arrow::array::RecordBatch; use arrow::datatypes::SchemaRef; use arrow::error::ArrowError; use arrow::util::display::{ArrayFormatter, ValueFormatter}; +use datafusion::common::format::DEFAULT_CLI_FORMAT_OPTIONS; use datafusion::common::instant::Instant; use datafusion::common::{plan_datafusion_err, plan_err}; use datafusion::config::ConfigFileType; use datafusion::datasource::listing::ListingTableUrl; use datafusion::error::{DataFusionError, Result}; +use datafusion::execution::memory_pool::MemoryConsumer; use datafusion::logical_expr::{DdlStatement, LogicalPlan}; use datafusion::physical_plan::execution_plan::EmissionType; +use datafusion::physical_plan::spill::get_record_batch_memory_size; use datafusion::physical_plan::{execute_stream, ExecutionPlanProperties}; use datafusion::sql::parser::{DFParser, Statement}; +use datafusion::sql::sqlparser; use datafusion::sql::sqlparser::dialect::dialect_from_str; use futures::StreamExt; +use rustyline::error::ReadlineError; +use rustyline::Editor; use std::collections::HashMap; use std::fs::File; use std::io::prelude::*; use std::io::BufReader; - -use datafusion::common::format::DEFAULT_CLI_FORMAT_OPTIONS; -use datafusion::execution::memory_pool::MemoryConsumer; -use datafusion::physical_plan::spill::get_record_batch_memory_size; -use datafusion::sql::sqlparser; -use rustyline::error::ReadlineError; -use rustyline::Editor; +use std::sync::Arc; use tokio::signal; /// run and execute SQL statements and commands, against a context with the given print options @@ -232,7 +232,10 @@ pub(super) async fn exec_and_print( let statements = DFParser::parse_sql_with_dialect(&sql, dialect.as_ref())?; for statement in statements { + let adjusted = + AdjustedPrintOptions::new(print_options.clone()).with_statement(&statement); let plan = create_plan(ctx, statement).await?; + let adjusted = adjusted.with_plan(&plan); let df = ctx.execute_logical_plan(plan).await?; let physical_plan = df.create_physical_plan().await?; @@ -242,8 +245,11 @@ pub(super) async fn exec_and_print( .with_can_spill(false) .register(task_ctx.memory_pool()); + let is_unbounded = physical_plan.boundedness().is_unbounded(); + let mut stream = execute_stream(Arc::clone(&physical_plan), task_ctx.clone())?; + // Both bounded and unbounded streams are streaming prints - if print_options.format != PrintFormat::Table { + if is_unbounded { if physical_plan.pipeline_behavior() == EmissionType::Final { return plan_err!( "The given query can generate a valid result only once \ @@ -252,264 +258,44 @@ pub(super) async fn exec_and_print( } // As the input stream comes, we can generate results. // However, memory safety is not guaranteed. - let stream = execute_stream(physical_plan, task_ctx.clone())?; - print_options.print_stream(stream, now).await?; + print_options + .print_stream(MaxRows::Unlimited, stream, now) + .await?; } else { // Bounded stream; collected results size is limited by the maxrows option let schema = physical_plan.schema(); - let mut stream = execute_stream(physical_plan, task_ctx.clone())?; let max_rows = match print_options.maxrows { MaxRows::Unlimited => usize::MAX, MaxRows::Limited(n) => n, }; - - let preview_limit: usize = 1000; - let mut preview_batches: Vec = vec![]; - let mut preview_row_count = 0_usize; - let mut total_count = 0_usize; - let mut precomputed_widths: Option> = None; - let mut header_printed = false; - let mut max_rows_reached = false; - let stdout = std::io::stdout(); let mut writer = stdout.lock(); - while let Some(batch) = stream.next().await { - let batch = batch?; - let batch_rows = batch.num_rows(); - - if !max_rows_reached { - if total_count < max_rows { - if total_count + batch_rows > max_rows { - let needed = max_rows - total_count; - let batch_to_print = batch.slice(0, needed); - process_batch( - &batch_to_print, - schema.clone(), - &mut preview_batches, - &mut preview_row_count, - preview_limit, - &mut precomputed_widths, - &mut header_printed, - &mut writer, - )?; - if precomputed_widths.is_none() { - let widths = compute_column_widths( - &preview_batches, - schema.clone(), - )?; - precomputed_widths = Some(widths.clone()); - if !header_printed { - print_header(&schema, &widths, &mut writer)?; - header_printed = true; - } - for preview_batch in preview_batches.drain(..) { - print_batch_with_widths( - &preview_batch, - &widths, - &mut writer, - )?; - } - } - if let Some(ref widths) = precomputed_widths { - for _ in 0..3 { - print_dotted_line(widths, &mut writer)?; - } - print_bottom_border(widths, &mut writer)?; - } - max_rows_reached = true; - } else { - process_batch( - &batch, - schema.clone(), - &mut preview_batches, - &mut preview_row_count, - preview_limit, - &mut precomputed_widths, - &mut header_printed, - &mut writer, - )?; - } - } - } - total_count += batch_rows; - } - - if !max_rows_reached { - if precomputed_widths.is_none() && !preview_batches.is_empty() { - let widths = compute_column_widths(&preview_batches, schema.clone())?; - precomputed_widths = Some(widths); - if !header_printed { - print_header( - &schema, - precomputed_widths.as_ref().unwrap(), - &mut writer, - )?; - header_printed = true; - } - } - if let Some(ref widths) = precomputed_widths { - print_bottom_border(widths, &mut writer)?; - } - } - - let formatted_exec_details = print_options.get_execution_details_formatted( - total_count, - print_options.maxrows, - now, - ); - if !print_options.quiet { - writeln!(writer, "{}", formatted_exec_details)?; - } - reservation.free(); - } - } - - Ok(()) -} - -fn process_batch( - batch: &RecordBatch, - schema: SchemaRef, - preview_batches: &mut Vec, - preview_row_count: &mut usize, - preview_limit: usize, - precomputed_widths: &mut Option>, - header_printed: &mut bool, - writer: &mut impl std::io::Write, -) -> Result<()> { - if precomputed_widths.is_none() { - preview_batches.push(batch.clone()); - *preview_row_count += batch.num_rows(); - if *preview_row_count >= preview_limit { - let widths = compute_column_widths(preview_batches, schema.clone())?; - *precomputed_widths = Some(widths.clone()); - print_header(&schema, &widths, writer)?; - *header_printed = true; - for preview_batch in preview_batches.drain(..) { - print_batch_with_widths(&preview_batch, &widths, writer)?; + // If we don't want to print the table, we should use the streaming print same as above + // todo json and csv need to be improved + if print_options.format != PrintFormat::Table + && print_options.format != PrintFormat::Automatic + { + print_options + .print_stream(print_options.maxrows, stream, now) + .await?; + continue; } - } - } else { - let widths = precomputed_widths.as_ref().unwrap(); - if !*header_printed { - print_header(&schema, widths, writer)?; - *header_printed = true; - } - print_batch_with_widths(batch, widths, writer)?; - } - Ok(()) -} -/// Compute the maximum width of each column in the given batches -fn compute_column_widths( - batches: &Vec, - schema: SchemaRef, -) -> Result> { - // 以表头作为初始宽度 - let mut widths: Vec = schema.fields().iter().map(|f| f.name().len()).collect(); - // 遍历所有 batch 计算各列宽度 - for batch in batches { - let formatters = batch - .columns() - .iter() - .map(|c| ArrayFormatter::try_new(c.as_ref(), &DEFAULT_CLI_FORMAT_OPTIONS)) - .collect::, ArrowError>>()?; - for row in 0..batch.num_rows() { - for (i, formatter) in formatters.iter().enumerate() { - let cell = formatter.value(row); - widths[i] = widths[i].max(cell.to_string().len()); - } + // into_inner will finalize the print options to table if it's automatic + adjusted + .into_inner() + .print_table_batch( + &print_options, + schema, + &mut stream, + max_rows, + &mut writer, + now, + ) + .await?; } } - Ok(widths) -} - -/// Print the header of a table -fn print_header( - schema: &SchemaRef, - widths: &Vec, - writer: &mut impl std::io::Write, -) -> Result<()> { - print_border(widths, writer)?; - - let header: Vec = schema - .fields() - .iter() - .enumerate() - .map(|(i, field)| pad_cell(field.name(), widths[i])) - .collect(); - writeln!(writer, "| {} |", header.join(" | "))?; - - print_border(widths, writer)?; - Ok(()) -} - -fn print_border(widths: &Vec, writer: &mut impl std::io::Write) -> Result<()> { - let cells: Vec = widths.iter().map(|&w| "-".repeat(w + 2)).collect(); - writeln!(writer, "+{}+", cells.join("+"))?; - Ok(()) -} - -/// Print a batch of records with the given widths for each column -fn print_batch_with_widths( - batch: &RecordBatch, - widths: &Vec, - writer: &mut impl std::io::Write, -) -> Result<()> { - let formatters = batch - .columns() - .iter() - .map(|c| ArrayFormatter::try_new(c.as_ref(), &DEFAULT_CLI_FORMAT_OPTIONS)) - .collect::, ArrowError>>()?; - for row in 0..batch.num_rows() { - let cells: Vec = formatters - .iter() - .enumerate() - .map(|(i, formatter)| pad_value(&formatter.value(row), widths[i])) - .collect(); - // 修改这里:在前后各加一个 "|" 以保持和 header 格式一致 - writeln!(writer, "| {} |", cells.join(" | "))?; - } - Ok(()) -} - -/// 为单元格内容补齐空格(左对齐) -fn pad_cell(cell: &str, width: usize) -> String { - format!("{: String { - let s = formatter.try_to_string().unwrap_or_default(); - format!("{:, - writer: &mut impl std::io::Write, -) -> Result<()> { - // 构造每个单元格,点号左对齐,长度与对应宽度相同 - let cells: Vec = widths - .iter() - .map(|&w| format!(" {: , - writer: &mut impl std::io::Write, -) -> Result<()> { - // 构造每个单元格对应的边框部分,例如 "+------------+" - let cells: Vec = widths - .iter() - .map(|&w| "-".repeat(w + 2)) // 加2可以对齐左右两侧的空格 - .collect(); - writeln!(writer, "+{}+", cells.join("+"))?; Ok(()) } diff --git a/datafusion-cli/src/print_format.rs b/datafusion-cli/src/print_format.rs index 1fc949593512..77732a2f9ff7 100644 --- a/datafusion-cli/src/print_format.rs +++ b/datafusion-cli/src/print_format.rs @@ -23,8 +23,10 @@ use crate::print_options::MaxRows; use arrow::csv::writer::WriterBuilder; use arrow::datatypes::SchemaRef; +use arrow::error::ArrowError; use arrow::json::{ArrayWriter, LineDelimitedWriter}; use arrow::record_batch::RecordBatch; +use arrow::util::display::{ArrayFormatter, ValueFormatter}; use arrow::util::pretty::pretty_format_batches_with_options; use datafusion::common::format::DEFAULT_CLI_FORMAT_OPTIONS; use datafusion::error::Result; @@ -209,6 +211,152 @@ impl PrintFormat { } Ok(()) } + + pub fn process_batch( + &self, + batch: &RecordBatch, + schema: SchemaRef, + preview_batches: &mut Vec, + preview_row_count: &mut usize, + preview_limit: usize, + precomputed_widths: &mut Option>, + header_printed: &mut bool, + writer: &mut dyn std::io::Write, + ) -> Result<()> { + if precomputed_widths.is_none() { + preview_batches.push(batch.clone()); + *preview_row_count += batch.num_rows(); + if *preview_row_count >= preview_limit { + let widths = + Self::compute_column_widths(self, preview_batches, schema.clone())?; + *precomputed_widths = Some(widths.clone()); + Self::print_header(self, &schema, &widths, writer)?; + *header_printed = true; + for preview_batch in preview_batches.drain(..) { + Self::print_batch_with_widths(self, &preview_batch, &widths, writer)?; + } + } + } else { + let widths = precomputed_widths.as_ref().unwrap(); + if !*header_printed { + Self::print_header(self, &schema, widths, writer)?; + *header_printed = true; + } + Self::print_batch_with_widths(self, batch, widths, writer)?; + } + Ok(()) + } + + pub fn compute_column_widths( + &self, + batches: &Vec, + schema: SchemaRef, + ) -> Result> { + let mut widths: Vec = + schema.fields().iter().map(|f| f.name().len()).collect(); + for batch in batches { + let formatters = batch + .columns() + .iter() + .map(|c| ArrayFormatter::try_new(c.as_ref(), &DEFAULT_CLI_FORMAT_OPTIONS)) + .collect::, ArrowError>>()?; + for row in 0..batch.num_rows() { + for (i, formatter) in formatters.iter().enumerate() { + let cell = formatter.value(row); + widths[i] = widths[i].max(cell.to_string().len()); + } + } + } + Ok(widths) + } + + pub fn print_header( + &self, + schema: &SchemaRef, + widths: &Vec, + writer: &mut dyn std::io::Write, + ) -> Result<()> { + Self::print_border(widths, writer)?; + + let header: Vec = schema + .fields() + .iter() + .enumerate() + .map(|(i, field)| Self::pad_cell(field.name(), widths[i])) + .collect(); + writeln!(writer, "| {} |", header.join(" | "))?; + + Self::print_border(widths, writer)?; + Ok(()) + } + + pub fn print_batch_with_widths( + &self, + batch: &RecordBatch, + widths: &Vec, + writer: &mut dyn std::io::Write, + ) -> Result<()> { + let formatters = batch + .columns() + .iter() + .map(|c| ArrayFormatter::try_new(c.as_ref(), &DEFAULT_CLI_FORMAT_OPTIONS)) + .collect::, ArrowError>>()?; + for row in 0..batch.num_rows() { + let cells: Vec = formatters + .iter() + .enumerate() + .map(|(i, formatter)| Self::pad_value(&formatter.value(row), widths[i])) + .collect(); + writeln!(writer, "| {} |", cells.join(" | "))?; + } + Ok(()) + } + + // 辅助函数:打印一行点行,用于表示被省略的行 + pub fn print_dotted_line( + &self, + widths: &Vec, + writer: &mut dyn std::io::Write, + ) -> Result<()> { + // 构造每个单元格,点号左对齐,长度与对应宽度相同 + let cells: Vec = widths + .iter() + .map(|&w| format!(" {: , + writer: &mut dyn std::io::Write, + ) -> Result<()> { + // 构造每个单元格对应的边框部分,例如 "+------------+" + let cells: Vec = widths + .iter() + .map(|&w| "-".repeat(w + 2)) // 加2可以对齐左右两侧的空格 + .collect(); + writeln!(writer, "+{}+", cells.join("+"))?; + Ok(()) + } + + fn print_border(widths: &Vec, writer: &mut dyn std::io::Write) -> Result<()> { + let cells: Vec = widths.iter().map(|&w| "-".repeat(w + 2)).collect(); + writeln!(writer, "+{}+", cells.join("+"))?; + Ok(()) + } + + fn pad_cell(cell: &str, width: usize) -> String { + format!("{: String { + let s = formatter.try_to_string().unwrap_or_default(); + format!("{: Result<()> { + let preview_limit: usize = 1000; + let mut preview_batches: Vec = vec![]; + let mut preview_row_count = 0_usize; + let mut total_count = 0_usize; + let mut precomputed_widths: Option> = None; + let mut header_printed = false; + let mut max_rows_reached = false; + + while let Some(batch) = stream.next().await { + let batch = batch?; + let batch_rows = batch.num_rows(); + + if !max_rows_reached { + if total_count < max_rows { + if total_count + batch_rows > max_rows { + let needed = max_rows - total_count; + let batch_to_print = batch.slice(0, needed); + print_options.format.process_batch( + &batch_to_print, + schema.clone(), + &mut preview_batches, + &mut preview_row_count, + preview_limit, + &mut precomputed_widths, + &mut header_printed, + writer, + )?; + if precomputed_widths.is_none() { + let widths = print_options.format.compute_column_widths( + &preview_batches, + schema.clone(), + )?; + precomputed_widths = Some(widths.clone()); + if !header_printed { + print_options + .format + .print_header(&schema, &widths, writer)?; + header_printed = true; + } + for preview_batch in preview_batches.drain(..) { + print_options.format.print_batch_with_widths( + &preview_batch, + &widths, + writer, + )?; + } + } + if let Some(ref widths) = precomputed_widths { + for _ in 0..3 { + print_options.format.print_dotted_line(widths, writer)?; + } + print_options.format.print_bottom_border(widths, writer)?; + } + max_rows_reached = true; + } else { + print_options.format.process_batch( + &batch, + schema.clone(), + &mut preview_batches, + &mut preview_row_count, + preview_limit, + &mut precomputed_widths, + &mut header_printed, + writer, + )?; + } + } + } + total_count += batch_rows; + } + + if !max_rows_reached { + if precomputed_widths.is_none() && !preview_batches.is_empty() { + let widths = print_options + .format + .compute_column_widths(&preview_batches, schema.clone())?; + precomputed_widths = Some(widths); + if !header_printed { + print_options.format.print_header( + &schema, + precomputed_widths.as_ref().unwrap(), + writer, + )?; + header_printed = true; + } + } + if let Some(ref widths) = precomputed_widths { + print_options.format.print_bottom_border(widths, writer)?; + } + } + + // 打印执行详情(例如总行数、耗时等) + let formatted_exec_details = print_options.get_execution_details_formatted( + total_count, + print_options.maxrows, + now, // 查询开始时间或当前时间 + ); + if !print_options.quiet { + writeln!(writer, "{}", formatted_exec_details)?; + } + + Ok(()) + } + /// Print the stream to stdout using the specified format pub async fn print_stream( &self, + max_rows: MaxRows, mut stream: Pin>, query_start_time: Instant, ) -> Result<()> { @@ -118,6 +233,11 @@ impl PrintOptions { )); }; + let max_count = match self.maxrows { + MaxRows::Unlimited => usize::MAX, + MaxRows::Limited(n) => n, + }; + let stdout = std::io::stdout(); let mut writer = stdout.lock(); @@ -127,21 +247,20 @@ impl PrintOptions { while let Some(maybe_batch) = stream.next().await { let batch = maybe_batch?; row_count += batch.num_rows(); - self.format.print_batches( - &mut writer, - batch.schema(), - &[batch], - MaxRows::Unlimited, - with_header, - )?; + if row_count < max_count || (with_header && row_count > max_count) { + self.format.print_batches( + &mut writer, + batch.schema(), + &[batch], + max_rows, + with_header, + )?; + } with_header = false; } - let formatted_exec_details = self.get_execution_details_formatted( - row_count, - MaxRows::Unlimited, - query_start_time, - ); + let formatted_exec_details = + self.get_execution_details_formatted(row_count, max_rows, query_start_time); if !self.quiet { writeln!(writer, "{formatted_exec_details}")?; From 7158684ab9969b18f2a790b52fae606e1829336e Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Tue, 25 Feb 2025 17:17:58 +0800 Subject: [PATCH 4/7] polish code --- datafusion-cli/src/exec.rs | 15 +--- datafusion-cli/src/print_format.rs | 18 ++--- datafusion-cli/src/print_options.rs | 102 +++++++++++++--------------- 3 files changed, 57 insertions(+), 78 deletions(-) diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs index 6ed69c5bfe02..1675a65b4e1b 100644 --- a/datafusion-cli/src/exec.rs +++ b/datafusion-cli/src/exec.rs @@ -26,25 +26,17 @@ use crate::{ object_storage::get_object_store, print_options::{MaxRows, PrintOptions}, }; -use arrow::array::RecordBatch; -use arrow::datatypes::SchemaRef; -use arrow::error::ArrowError; -use arrow::util::display::{ArrayFormatter, ValueFormatter}; -use datafusion::common::format::DEFAULT_CLI_FORMAT_OPTIONS; use datafusion::common::instant::Instant; use datafusion::common::{plan_datafusion_err, plan_err}; use datafusion::config::ConfigFileType; use datafusion::datasource::listing::ListingTableUrl; use datafusion::error::{DataFusionError, Result}; -use datafusion::execution::memory_pool::MemoryConsumer; use datafusion::logical_expr::{DdlStatement, LogicalPlan}; use datafusion::physical_plan::execution_plan::EmissionType; -use datafusion::physical_plan::spill::get_record_batch_memory_size; use datafusion::physical_plan::{execute_stream, ExecutionPlanProperties}; use datafusion::sql::parser::{DFParser, Statement}; use datafusion::sql::sqlparser; use datafusion::sql::sqlparser::dialect::dialect_from_str; -use futures::StreamExt; use rustyline::error::ReadlineError; use rustyline::Editor; use std::collections::HashMap; @@ -240,11 +232,6 @@ pub(super) async fn exec_and_print( let df = ctx.execute_logical_plan(plan).await?; let physical_plan = df.create_physical_plan().await?; - // Track memory usage for the query result if it's bounded - let mut reservation = MemoryConsumer::new("DataFusion-Cli") - .with_can_spill(false) - .register(task_ctx.memory_pool()); - let is_unbounded = physical_plan.boundedness().is_unbounded(); let mut stream = execute_stream(Arc::clone(&physical_plan), task_ctx.clone())?; @@ -286,7 +273,7 @@ pub(super) async fn exec_and_print( adjusted .into_inner() .print_table_batch( - &print_options, + print_options, schema, &mut stream, max_rows, diff --git a/datafusion-cli/src/print_format.rs b/datafusion-cli/src/print_format.rs index 77732a2f9ff7..cc38dbdf2ed4 100644 --- a/datafusion-cli/src/print_format.rs +++ b/datafusion-cli/src/print_format.rs @@ -212,6 +212,7 @@ impl PrintFormat { Ok(()) } + #[allow(clippy::too_many_arguments)] pub fn process_batch( &self, batch: &RecordBatch, @@ -273,7 +274,7 @@ impl PrintFormat { pub fn print_header( &self, schema: &SchemaRef, - widths: &Vec, + widths: &[usize], writer: &mut dyn std::io::Write, ) -> Result<()> { Self::print_border(widths, writer)?; @@ -293,7 +294,7 @@ impl PrintFormat { pub fn print_batch_with_widths( &self, batch: &RecordBatch, - widths: &Vec, + widths: &[usize], writer: &mut dyn std::io::Write, ) -> Result<()> { let formatters = batch @@ -312,38 +313,33 @@ impl PrintFormat { Ok(()) } - // 辅助函数:打印一行点行,用于表示被省略的行 pub fn print_dotted_line( &self, - widths: &Vec, + widths: &[usize], writer: &mut dyn std::io::Write, ) -> Result<()> { - // 构造每个单元格,点号左对齐,长度与对应宽度相同 let cells: Vec = widths .iter() .map(|&w| format!(" {: , + widths: &[usize], writer: &mut dyn std::io::Write, ) -> Result<()> { - // 构造每个单元格对应的边框部分,例如 "+------------+" let cells: Vec = widths .iter() - .map(|&w| "-".repeat(w + 2)) // 加2可以对齐左右两侧的空格 + .map(|&w| "-".repeat(w + 2)) .collect(); writeln!(writer, "+{}+", cells.join("+"))?; Ok(()) } - fn print_border(widths: &Vec, writer: &mut dyn std::io::Write) -> Result<()> { + fn print_border(widths: &[usize], writer: &mut dyn std::io::Write) -> Result<()> { let cells: Vec = widths.iter().map(|&w| "-".repeat(w + 2)).collect(); writeln!(writer, "+{}+", cells.join("+"))?; Ok(()) diff --git a/datafusion-cli/src/print_options.rs b/datafusion-cli/src/print_options.rs index 9b27481774c6..81e5bdce0931 100644 --- a/datafusion-cli/src/print_options.rs +++ b/datafusion-cli/src/print_options.rs @@ -128,62 +128,60 @@ impl PrintOptions { let batch = batch?; let batch_rows = batch.num_rows(); - if !max_rows_reached { - if total_count < max_rows { - if total_count + batch_rows > max_rows { - let needed = max_rows - total_count; - let batch_to_print = batch.slice(0, needed); - print_options.format.process_batch( - &batch_to_print, - schema.clone(), - &mut preview_batches, - &mut preview_row_count, - preview_limit, - &mut precomputed_widths, - &mut header_printed, - writer, - )?; - if precomputed_widths.is_none() { - let widths = print_options.format.compute_column_widths( - &preview_batches, - schema.clone(), + if !max_rows_reached && total_count < max_rows { + if total_count + batch_rows > max_rows { + let needed = max_rows - total_count; + let batch_to_print = batch.slice(0, needed); + print_options.format.process_batch( + &batch_to_print, + schema.clone(), + &mut preview_batches, + &mut preview_row_count, + preview_limit, + &mut precomputed_widths, + &mut header_printed, + writer, + )?; + if precomputed_widths.is_none() { + let widths = print_options + .format + .compute_column_widths(&preview_batches, schema.clone())?; + precomputed_widths = Some(widths.clone()); + if !header_printed { + print_options + .format + .print_header(&schema, &widths, writer)?; + header_printed = true; + } + for preview_batch in preview_batches.drain(..) { + print_options.format.print_batch_with_widths( + &preview_batch, + &widths, + writer, )?; - precomputed_widths = Some(widths.clone()); - if !header_printed { - print_options - .format - .print_header(&schema, &widths, writer)?; - header_printed = true; - } - for preview_batch in preview_batches.drain(..) { - print_options.format.print_batch_with_widths( - &preview_batch, - &widths, - writer, - )?; - } } - if let Some(ref widths) = precomputed_widths { - for _ in 0..3 { - print_options.format.print_dotted_line(widths, writer)?; - } - print_options.format.print_bottom_border(widths, writer)?; + } + if let Some(ref widths) = precomputed_widths { + for _ in 0..3 { + print_options.format.print_dotted_line(widths, writer)?; } - max_rows_reached = true; - } else { - print_options.format.process_batch( - &batch, - schema.clone(), - &mut preview_batches, - &mut preview_row_count, - preview_limit, - &mut precomputed_widths, - &mut header_printed, - writer, - )?; + print_options.format.print_bottom_border(widths, writer)?; } + max_rows_reached = true; + } else { + print_options.format.process_batch( + &batch, + schema.clone(), + &mut preview_batches, + &mut preview_row_count, + preview_limit, + &mut precomputed_widths, + &mut header_printed, + writer, + )?; } } + total_count += batch_rows; } @@ -199,7 +197,6 @@ impl PrintOptions { precomputed_widths.as_ref().unwrap(), writer, )?; - header_printed = true; } } if let Some(ref widths) = precomputed_widths { @@ -207,11 +204,10 @@ impl PrintOptions { } } - // 打印执行详情(例如总行数、耗时等) let formatted_exec_details = print_options.get_execution_details_formatted( total_count, print_options.maxrows, - now, // 查询开始时间或当前时间 + now, ); if !print_options.quiet { writeln!(writer, "{}", formatted_exec_details)?; From a214776f9bb2d46154e61eaf0effcfe41b3f6835 Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Wed, 26 Feb 2025 00:34:09 +0800 Subject: [PATCH 5/7] Add unit test --- datafusion-cli/src/exec.rs | 1 - datafusion-cli/src/print_format.rs | 346 ++++++++++++++++++++++++++++ datafusion-cli/src/print_options.rs | 33 ++- 3 files changed, 370 insertions(+), 10 deletions(-) diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs index 1675a65b4e1b..31af16587626 100644 --- a/datafusion-cli/src/exec.rs +++ b/datafusion-cli/src/exec.rs @@ -259,7 +259,6 @@ pub(super) async fn exec_and_print( let mut writer = stdout.lock(); // If we don't want to print the table, we should use the streaming print same as above - // todo json and csv need to be improved if print_options.format != PrintFormat::Table && print_options.format != PrintFormat::Automatic { diff --git a/datafusion-cli/src/print_format.rs b/datafusion-cli/src/print_format.rs index cc38dbdf2ed4..66829d9d62a7 100644 --- a/datafusion-cli/src/print_format.rs +++ b/datafusion-cli/src/print_format.rs @@ -683,6 +683,339 @@ mod tests { .run(); } + #[test] + fn test_compute_column_widths() { + let schema = three_column_schema(); + let batches = vec![three_column_batch()]; + let format = PrintFormat::Table; + let widths = format.compute_column_widths(&batches, schema).unwrap(); + assert_eq!(widths, vec![1, 1, 1]); + + let schema = one_column_schema(); + let batches = vec![one_column_batch()]; + let format = PrintFormat::Table; + let widths = format.compute_column_widths(&batches, schema).unwrap(); + assert_eq!(widths, vec![1]); + + let schema = three_column_schema(); + let batches = vec![three_column_batch_with_widths()]; + let format = PrintFormat::Table; + let widths = format.compute_column_widths(&batches, schema).unwrap(); + assert_eq!(widths, [7, 5, 6]); + } + + #[test] + fn test_print_header() { + let schema = three_column_schema(); + let widths = vec![1, 1, 1]; + let mut writer = Vec::new(); + let format = PrintFormat::Table; + format.print_header(&schema, &widths, &mut writer).unwrap(); + let expected = &[ + "+---+---+---+", + "| a | b | c |", + "+---+---+---+", + ]; + let binding = String::from_utf8(writer.clone()).unwrap(); + let actual: Vec<_> = binding.trim_end().split('\n').collect(); + assert_eq!(actual, expected); + } + + #[test] + fn test_print_batch_with_same_widths() { + let batch = three_column_batch(); + let widths = vec![1, 1, 1]; + let mut writer = Vec::new(); + let format = PrintFormat::Table; + format.print_batch_with_widths(&batch, &widths, &mut writer).unwrap(); + let expected = &[ + "| 1 | 4 | 7 |", + "| 2 | 5 | 8 |", + "| 3 | 6 | 9 |", + ]; + let binding = String::from_utf8(writer.clone()).unwrap(); + let actual: Vec<_> = binding.trim_end().split('\n').collect(); + assert_eq!(actual, expected); + } + + #[test] + fn test_print_batch_with_different_widths() { + let batch = three_column_batch_with_widths(); + let widths = vec![7, 5, 6]; + let mut writer = Vec::new(); + let format = PrintFormat::Table; + format.print_batch_with_widths(&batch, &widths, &mut writer).unwrap(); + let expected = &[ + "| 1 | 42222 | 7 |", + "| 2222222 | 5 | 8 |", + "| 3 | 6 | 922222 |", + ]; + let binding = String::from_utf8(writer.clone()).unwrap(); + let actual: Vec<_> = binding.trim_end().split('\n').collect(); + assert_eq!(actual, expected); + } + + #[test] + fn test_print_dotted_line() { + let widths = vec![1, 1, 1]; + let mut writer = Vec::new(); + let format = PrintFormat::Table; + format.print_dotted_line(&widths, &mut writer).unwrap(); + let expected = &[ + "| . | . | . |", + ]; + let binding = String::from_utf8(writer.clone()).unwrap(); + let actual: Vec<_> = binding.trim_end().split('\n').collect(); + assert_eq!(actual, expected); + } + + #[test] + fn test_print_bottom_border() { + let widths = vec![1, 1, 1]; + let mut writer = Vec::new(); + let format = PrintFormat::Table; + format.print_bottom_border(&widths, &mut writer).unwrap(); + let expected = &[ + "+---+---+---+", + ]; + let binding = String::from_utf8(writer.clone()).unwrap(); + let actual: Vec<_> = binding.trim_end().split('\n').collect(); + assert_eq!(actual, expected); + } + + #[test] + fn test_print_batches_with_maxrows() { + let batch = one_column_batch(); + let schema = one_column_schema(); + let format = PrintFormat::Table; + + // should print out entire output with no truncation if unlimited or + // limit greater than number of batches or equal to the number of batches + for max_rows in [MaxRows::Unlimited, MaxRows::Limited(5), MaxRows::Limited(3)] { + let mut writer = Vec::new(); + format + .print_batches(&mut writer, schema.clone(), &[batch.clone()], max_rows, true) + .unwrap(); + let expected = &[ + "+---+", + "| a |", + "+---+", + "| 1 |", + "| 2 |", + "| 3 |", + "+---+", + ]; + let binding = String::from_utf8(writer.clone()).unwrap(); + let actual: Vec<_> = binding.trim_end().split('\n').collect(); + assert_eq!(actual, expected); + } + + // should truncate output if limit is less than number of batches + let mut writer = Vec::new(); + format + .print_batches(&mut writer, schema.clone(), &[batch.clone()], MaxRows::Limited(1), true) + .unwrap(); + let expected = &[ + "+---+", + "| a |", + "+---+", + "| 1 |", + "| . |", + "| . |", + "| . |", + "+---+", + ]; + let binding = String::from_utf8(writer.clone()).unwrap(); + let actual: Vec<_> = binding.trim_end().split('\n').collect(); + assert_eq!(actual, expected); + } + + // test print_batch with different batch widths + // and preview count is less than the first batch + #[test] + fn test_print_batches_with_preview_count_less_than_first_batch() { + let batch = three_column_batch_with_widths(); + let schema = three_column_schema(); + let format = PrintFormat::Table; + let preview_limit = 2; + let mut preview_batches = Vec::new(); + let mut preview_row_count = 0; + let mut precomputed_widths = None; + let mut header_printed = false; + let mut writer = Vec::new(); + + format + .process_batch( + &batch, + schema.clone(), + &mut preview_batches, + &mut preview_row_count, + preview_limit, + &mut precomputed_widths, + &mut header_printed, + &mut writer, + ) + .unwrap(); + + let expected = &[ + "+---------+-------+--------+", + "| a | b | c |", + "+---------+-------+--------+", + "| 1 | 42222 | 7 |", + "| 2222222 | 5 | 8 |", + "| 3 | 6 | 922222 |", + ]; + let binding = String::from_utf8(writer.clone()).unwrap(); + let actual: Vec<_> = binding.trim_end().split('\n').collect(); + assert_eq!(actual, expected); + } + + + #[test] + fn test_print_batches_with_preview_and_later_batches() { + let batch1 = three_column_batch(); + let batch2 = three_column_batch_with_widths(); + let schema = three_column_schema(); + let format = PrintFormat::Table; + // preview limit is less than the first batch + // so the second batch if it's width is greater than the first batch, it will be unformatted + let preview_limit = 2; + let mut preview_batches = Vec::new(); + let mut preview_row_count = 0; + let mut precomputed_widths = None; + let mut header_printed = false; + let mut writer = Vec::new(); + + format + .process_batch( + &batch1, + schema.clone(), + &mut preview_batches, + &mut preview_row_count, + preview_limit, + &mut precomputed_widths, + &mut header_printed, + &mut writer, + ) + .unwrap(); + + format + .process_batch( + &batch2, + schema.clone(), + &mut preview_batches, + &mut preview_row_count, + preview_limit, + &mut precomputed_widths, + &mut header_printed, + &mut writer, + ) + .unwrap(); + + format + .process_batch( + &batch1, + schema.clone(), + &mut preview_batches, + &mut preview_row_count, + preview_limit, + &mut precomputed_widths, + &mut header_printed, + &mut writer, + ) + .unwrap(); + + let expected = &[ + "+---+---+---+", + "| a | b | c |", + "+---+---+---+", + "| 1 | 4 | 7 |", + "| 2 | 5 | 8 |", + "| 3 | 6 | 9 |", + "| 1 | 42222 | 7 |", + "| 2222222 | 5 | 8 |", + "| 3 | 6 | 922222 |", + "| 1 | 4 | 7 |", + "| 2 | 5 | 8 |", + "| 3 | 6 | 9 |", + ]; + let binding = String::from_utf8(writer.clone()).unwrap(); + let actual: Vec<_> = binding.trim_end().split('\n').collect(); + assert_eq!(actual, expected); + } + + #[test] + fn test_print_batches_with_preview_cover_later_batches() { + let batch1 = three_column_batch(); + let batch2 = three_column_batch_with_widths(); + let schema = three_column_schema(); + let format = PrintFormat::Table; + // preview limit is greater than the first batch + let preview_limit = 4; + let mut preview_batches = Vec::new(); + let mut preview_row_count = 0; + let mut precomputed_widths = None; + let mut header_printed = false; + let mut writer = Vec::new(); + + format + .process_batch( + &batch1, + schema.clone(), + &mut preview_batches, + &mut preview_row_count, + preview_limit, + &mut precomputed_widths, + &mut header_printed, + &mut writer, + ) + .unwrap(); + + format + .process_batch( + &batch2, + schema.clone(), + &mut preview_batches, + &mut preview_row_count, + preview_limit, + &mut precomputed_widths, + &mut header_printed, + &mut writer, + ) + .unwrap(); + + format + .process_batch( + &batch1, + schema.clone(), + &mut preview_batches, + &mut preview_row_count, + preview_limit, + &mut precomputed_widths, + &mut header_printed, + &mut writer, + ) + .unwrap(); + + let expected = &[ + "+---------+-------+--------+", + "| a | b | c |", + "+---------+-------+--------+", + "| 1 | 4 | 7 |", + "| 2 | 5 | 8 |", + "| 3 | 6 | 9 |", + "| 1 | 42222 | 7 |", + "| 2222222 | 5 | 8 |", + "| 3 | 6 | 922222 |", + "| 1 | 4 | 7 |", + "| 2 | 5 | 8 |", + "| 3 | 6 | 9 |", + ]; + let binding = String::from_utf8(writer.clone()).unwrap(); + let actual: Vec<_> = binding.trim_end().split('\n').collect(); + assert_eq!(actual, expected); + } + #[derive(Debug)] struct PrintBatchesTest { format: PrintFormat, @@ -816,6 +1149,19 @@ mod tests { .unwrap() } + /// Return a batch with three columns and three rows, but with different widths + fn three_column_batch_with_widths() -> RecordBatch { + RecordBatch::try_new( + three_column_schema(), + vec![ + Arc::new(Int32Array::from(vec![1, 2222222, 3])), + Arc::new(Int32Array::from(vec![42222, 5, 6])), + Arc::new(Int32Array::from(vec![7, 8, 922222])), + ], + ) + .unwrap() + } + /// Return a schema with one column fn one_column_schema() -> SchemaRef { Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])) diff --git a/datafusion-cli/src/print_options.rs b/datafusion-cli/src/print_options.rs index 81e5bdce0931..092483faed63 100644 --- a/datafusion-cli/src/print_options.rs +++ b/datafusion-cli/src/print_options.rs @@ -239,19 +239,34 @@ impl PrintOptions { let mut row_count = 0_usize; let mut with_header = true; + let mut max_rows_reached = false; while let Some(maybe_batch) = stream.next().await { let batch = maybe_batch?; - row_count += batch.num_rows(); - if row_count < max_count || (with_header && row_count > max_count) { - self.format.print_batches( - &mut writer, - batch.schema(), - &[batch], - max_rows, - with_header, - )?; + let curr_batch_rows = batch.num_rows(); + if !max_rows_reached && row_count < max_count { + if row_count + curr_batch_rows > max_count { + let needed = max_count - row_count; + let batch_to_print = batch.slice(0, needed); + self.format.print_batches( + &mut writer, + batch.schema(), + &[batch_to_print], + max_rows, + with_header, + )?; + max_rows_reached = true; + } else { + self.format.print_batches( + &mut writer, + batch.schema(), + &[batch], + max_rows, + with_header, + )?; + } } + row_count += curr_batch_rows; with_header = false; } From 236b2b2460af73e9b115e8461e62f6b997785788 Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Wed, 26 Feb 2025 00:35:14 +0800 Subject: [PATCH 6/7] fmt --- datafusion-cli/src/print_format.rs | 69 ++++++++++++------------------ 1 file changed, 28 insertions(+), 41 deletions(-) diff --git a/datafusion-cli/src/print_format.rs b/datafusion-cli/src/print_format.rs index 66829d9d62a7..ed3f03781c43 100644 --- a/datafusion-cli/src/print_format.rs +++ b/datafusion-cli/src/print_format.rs @@ -331,10 +331,7 @@ impl PrintFormat { widths: &[usize], writer: &mut dyn std::io::Write, ) -> Result<()> { - let cells: Vec = widths - .iter() - .map(|&w| "-".repeat(w + 2)) - .collect(); + let cells: Vec = widths.iter().map(|&w| "-".repeat(w + 2)).collect(); writeln!(writer, "+{}+", cells.join("+"))?; Ok(()) } @@ -697,7 +694,7 @@ mod tests { let widths = format.compute_column_widths(&batches, schema).unwrap(); assert_eq!(widths, vec![1]); - let schema = three_column_schema(); + let schema = three_column_schema(); let batches = vec![three_column_batch_with_widths()]; let format = PrintFormat::Table; let widths = format.compute_column_widths(&batches, schema).unwrap(); @@ -711,11 +708,7 @@ mod tests { let mut writer = Vec::new(); let format = PrintFormat::Table; format.print_header(&schema, &widths, &mut writer).unwrap(); - let expected = &[ - "+---+---+---+", - "| a | b | c |", - "+---+---+---+", - ]; + let expected = &["+---+---+---+", "| a | b | c |", "+---+---+---+"]; let binding = String::from_utf8(writer.clone()).unwrap(); let actual: Vec<_> = binding.trim_end().split('\n').collect(); assert_eq!(actual, expected); @@ -727,12 +720,10 @@ mod tests { let widths = vec![1, 1, 1]; let mut writer = Vec::new(); let format = PrintFormat::Table; - format.print_batch_with_widths(&batch, &widths, &mut writer).unwrap(); - let expected = &[ - "| 1 | 4 | 7 |", - "| 2 | 5 | 8 |", - "| 3 | 6 | 9 |", - ]; + format + .print_batch_with_widths(&batch, &widths, &mut writer) + .unwrap(); + let expected = &["| 1 | 4 | 7 |", "| 2 | 5 | 8 |", "| 3 | 6 | 9 |"]; let binding = String::from_utf8(writer.clone()).unwrap(); let actual: Vec<_> = binding.trim_end().split('\n').collect(); assert_eq!(actual, expected); @@ -744,7 +735,9 @@ mod tests { let widths = vec![7, 5, 6]; let mut writer = Vec::new(); let format = PrintFormat::Table; - format.print_batch_with_widths(&batch, &widths, &mut writer).unwrap(); + format + .print_batch_with_widths(&batch, &widths, &mut writer) + .unwrap(); let expected = &[ "| 1 | 42222 | 7 |", "| 2222222 | 5 | 8 |", @@ -761,9 +754,7 @@ mod tests { let mut writer = Vec::new(); let format = PrintFormat::Table; format.print_dotted_line(&widths, &mut writer).unwrap(); - let expected = &[ - "| . | . | . |", - ]; + let expected = &["| . | . | . |"]; let binding = String::from_utf8(writer.clone()).unwrap(); let actual: Vec<_> = binding.trim_end().split('\n').collect(); assert_eq!(actual, expected); @@ -775,9 +766,7 @@ mod tests { let mut writer = Vec::new(); let format = PrintFormat::Table; format.print_bottom_border(&widths, &mut writer).unwrap(); - let expected = &[ - "+---+---+---+", - ]; + let expected = &["+---+---+---+"]; let binding = String::from_utf8(writer.clone()).unwrap(); let actual: Vec<_> = binding.trim_end().split('\n').collect(); assert_eq!(actual, expected); @@ -794,16 +783,16 @@ mod tests { for max_rows in [MaxRows::Unlimited, MaxRows::Limited(5), MaxRows::Limited(3)] { let mut writer = Vec::new(); format - .print_batches(&mut writer, schema.clone(), &[batch.clone()], max_rows, true) + .print_batches( + &mut writer, + schema.clone(), + &[batch.clone()], + max_rows, + true, + ) .unwrap(); let expected = &[ - "+---+", - "| a |", - "+---+", - "| 1 |", - "| 2 |", - "| 3 |", - "+---+", + "+---+", "| a |", "+---+", "| 1 |", "| 2 |", "| 3 |", "+---+", ]; let binding = String::from_utf8(writer.clone()).unwrap(); let actual: Vec<_> = binding.trim_end().split('\n').collect(); @@ -813,17 +802,16 @@ mod tests { // should truncate output if limit is less than number of batches let mut writer = Vec::new(); format - .print_batches(&mut writer, schema.clone(), &[batch.clone()], MaxRows::Limited(1), true) + .print_batches( + &mut writer, + schema.clone(), + &[batch.clone()], + MaxRows::Limited(1), + true, + ) .unwrap(); let expected = &[ - "+---+", - "| a |", - "+---+", - "| 1 |", - "| . |", - "| . |", - "| . |", - "+---+", + "+---+", "| a |", "+---+", "| 1 |", "| . |", "| . |", "| . |", "+---+", ]; let binding = String::from_utf8(writer.clone()).unwrap(); let actual: Vec<_> = binding.trim_end().split('\n').collect(); @@ -870,7 +858,6 @@ mod tests { assert_eq!(actual, expected); } - #[test] fn test_print_batches_with_preview_and_later_batches() { let batch1 = three_column_batch(); From 303fe1c6738ceb0c712cd53fad4c1067c8efbd05 Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Wed, 26 Feb 2025 00:38:39 +0800 Subject: [PATCH 7/7] remove println --- datafusion-cli/tests/cli_integration.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion-cli/tests/cli_integration.rs b/datafusion-cli/tests/cli_integration.rs index 0c6a157142e9..27cabf15afec 100644 --- a/datafusion-cli/tests/cli_integration.rs +++ b/datafusion-cli/tests/cli_integration.rs @@ -54,6 +54,5 @@ fn cli_quick_test<'a>( ) { let mut cmd = Command::cargo_bin("datafusion-cli").unwrap(); cmd.args(args); - println!("Self output {:?}", cmd.output()); cmd.assert().stdout(predicate::eq(expected)); }