diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs index d560dee987f5..31af16587626 100644 --- a/datafusion-cli/src/exec.rs +++ b/datafusion-cli/src/exec.rs @@ -26,12 +26,6 @@ use crate::{ object_storage::get_object_store, print_options::{MaxRows, PrintOptions}, }; -use futures::StreamExt; -use std::collections::HashMap; -use std::fs::File; -use std::io::prelude::*; -use std::io::BufReader; - use datafusion::common::instant::Instant; use datafusion::common::{plan_datafusion_err, plan_err}; use datafusion::config::ConfigFileType; @@ -41,13 +35,15 @@ use datafusion::logical_expr::{DdlStatement, LogicalPlan}; use datafusion::physical_plan::execution_plan::EmissionType; use datafusion::physical_plan::{execute_stream, ExecutionPlanProperties}; use datafusion::sql::parser::{DFParser, Statement}; -use datafusion::sql::sqlparser::dialect::dialect_from_str; - -use datafusion::execution::memory_pool::MemoryConsumer; -use datafusion::physical_plan::spill::get_record_batch_memory_size; use datafusion::sql::sqlparser; +use datafusion::sql::sqlparser::dialect::dialect_from_str; use rustyline::error::ReadlineError; use rustyline::Editor; +use std::collections::HashMap; +use std::fs::File; +use std::io::prelude::*; +use std::io::BufReader; +use std::sync::Arc; use tokio::signal; /// run and execute SQL statements and commands, against a context with the given print options @@ -230,18 +226,17 @@ pub(super) async fn exec_and_print( for statement in statements { let adjusted = AdjustedPrintOptions::new(print_options.clone()).with_statement(&statement); - let plan = create_plan(ctx, statement).await?; let adjusted = adjusted.with_plan(&plan); let df = ctx.execute_logical_plan(plan).await?; let physical_plan = df.create_physical_plan().await?; - // Track memory usage for the query result if it's bounded - let mut reservation = - MemoryConsumer::new("DataFusion-Cli").register(task_ctx.memory_pool()); + let is_unbounded = physical_plan.boundedness().is_unbounded(); + let mut stream = execute_stream(Arc::clone(&physical_plan), task_ctx.clone())?; - if physical_plan.boundedness().is_unbounded() { + // Both bounded and unbounded streams are streaming prints + if is_unbounded { if physical_plan.pipeline_behavior() == EmissionType::Final { return plan_err!( "The given query can generate a valid result only once \ @@ -250,37 +245,43 @@ pub(super) async fn exec_and_print( } // As the input stream comes, we can generate results. // However, memory safety is not guaranteed. - let stream = execute_stream(physical_plan, task_ctx.clone())?; - print_options.print_stream(stream, now).await?; + print_options + .print_stream(MaxRows::Unlimited, stream, now) + .await?; } else { // Bounded stream; collected results size is limited by the maxrows option let schema = physical_plan.schema(); - let mut stream = execute_stream(physical_plan, task_ctx.clone())?; - let mut results = vec![]; - let mut row_count = 0_usize; let max_rows = match print_options.maxrows { MaxRows::Unlimited => usize::MAX, MaxRows::Limited(n) => n, }; - while let Some(batch) = stream.next().await { - let batch = batch?; - let curr_num_rows = batch.num_rows(); - // Stop collecting results if the number of rows exceeds the limit - // results batch should include the last batch that exceeds the limit - if row_count < max_rows + curr_num_rows { - // Try to grow the reservation to accommodate the batch in memory - reservation.try_grow(get_record_batch_memory_size(&batch))?; - results.push(batch); - } - row_count += curr_num_rows; + let stdout = std::io::stdout(); + let mut writer = stdout.lock(); + + // If we don't want to print the table, we should use the streaming print same as above + if print_options.format != PrintFormat::Table + && print_options.format != PrintFormat::Automatic + { + print_options + .print_stream(print_options.maxrows, stream, now) + .await?; + continue; } + + // into_inner will finalize the print options to table if it's automatic adjusted .into_inner() - .print_batches(schema, &results, now, row_count)?; - reservation.free(); + .print_table_batch( + print_options, + schema, + &mut stream, + max_rows, + &mut writer, + now, + ) + .await?; } } - Ok(()) } diff --git a/datafusion-cli/src/print_format.rs b/datafusion-cli/src/print_format.rs index 1fc949593512..ed3f03781c43 100644 --- a/datafusion-cli/src/print_format.rs +++ b/datafusion-cli/src/print_format.rs @@ -23,8 +23,10 @@ use crate::print_options::MaxRows; use arrow::csv::writer::WriterBuilder; use arrow::datatypes::SchemaRef; +use arrow::error::ArrowError; use arrow::json::{ArrayWriter, LineDelimitedWriter}; use arrow::record_batch::RecordBatch; +use arrow::util::display::{ArrayFormatter, ValueFormatter}; use arrow::util::pretty::pretty_format_batches_with_options; use datafusion::common::format::DEFAULT_CLI_FORMAT_OPTIONS; use datafusion::error::Result; @@ -209,6 +211,145 @@ impl PrintFormat { } Ok(()) } + + #[allow(clippy::too_many_arguments)] + pub fn process_batch( + &self, + batch: &RecordBatch, + schema: SchemaRef, + preview_batches: &mut Vec, + preview_row_count: &mut usize, + preview_limit: usize, + precomputed_widths: &mut Option>, + header_printed: &mut bool, + writer: &mut dyn std::io::Write, + ) -> Result<()> { + if precomputed_widths.is_none() { + preview_batches.push(batch.clone()); + *preview_row_count += batch.num_rows(); + if *preview_row_count >= preview_limit { + let widths = + Self::compute_column_widths(self, preview_batches, schema.clone())?; + *precomputed_widths = Some(widths.clone()); + Self::print_header(self, &schema, &widths, writer)?; + *header_printed = true; + for preview_batch in preview_batches.drain(..) { + Self::print_batch_with_widths(self, &preview_batch, &widths, writer)?; + } + } + } else { + let widths = precomputed_widths.as_ref().unwrap(); + if !*header_printed { + Self::print_header(self, &schema, widths, writer)?; + *header_printed = true; + } + Self::print_batch_with_widths(self, batch, widths, writer)?; + } + Ok(()) + } + + pub fn compute_column_widths( + &self, + batches: &Vec, + schema: SchemaRef, + ) -> Result> { + let mut widths: Vec = + schema.fields().iter().map(|f| f.name().len()).collect(); + for batch in batches { + let formatters = batch + .columns() + .iter() + .map(|c| ArrayFormatter::try_new(c.as_ref(), &DEFAULT_CLI_FORMAT_OPTIONS)) + .collect::, ArrowError>>()?; + for row in 0..batch.num_rows() { + for (i, formatter) in formatters.iter().enumerate() { + let cell = formatter.value(row); + widths[i] = widths[i].max(cell.to_string().len()); + } + } + } + Ok(widths) + } + + pub fn print_header( + &self, + schema: &SchemaRef, + widths: &[usize], + writer: &mut dyn std::io::Write, + ) -> Result<()> { + Self::print_border(widths, writer)?; + + let header: Vec = schema + .fields() + .iter() + .enumerate() + .map(|(i, field)| Self::pad_cell(field.name(), widths[i])) + .collect(); + writeln!(writer, "| {} |", header.join(" | "))?; + + Self::print_border(widths, writer)?; + Ok(()) + } + + pub fn print_batch_with_widths( + &self, + batch: &RecordBatch, + widths: &[usize], + writer: &mut dyn std::io::Write, + ) -> Result<()> { + let formatters = batch + .columns() + .iter() + .map(|c| ArrayFormatter::try_new(c.as_ref(), &DEFAULT_CLI_FORMAT_OPTIONS)) + .collect::, ArrowError>>()?; + for row in 0..batch.num_rows() { + let cells: Vec = formatters + .iter() + .enumerate() + .map(|(i, formatter)| Self::pad_value(&formatter.value(row), widths[i])) + .collect(); + writeln!(writer, "| {} |", cells.join(" | "))?; + } + Ok(()) + } + + pub fn print_dotted_line( + &self, + widths: &[usize], + writer: &mut dyn std::io::Write, + ) -> Result<()> { + let cells: Vec = widths + .iter() + .map(|&w| format!(" {: Result<()> { + let cells: Vec = widths.iter().map(|&w| "-".repeat(w + 2)).collect(); + writeln!(writer, "+{}+", cells.join("+"))?; + Ok(()) + } + + fn print_border(widths: &[usize], writer: &mut dyn std::io::Write) -> Result<()> { + let cells: Vec = widths.iter().map(|&w| "-".repeat(w + 2)).collect(); + writeln!(writer, "+{}+", cells.join("+"))?; + Ok(()) + } + + fn pad_cell(cell: &str, width: usize) -> String { + format!("{: String { + let s = formatter.try_to_string().unwrap_or_default(); + format!("{: = binding.trim_end().split('\n').collect(); + assert_eq!(actual, expected); + } + + #[test] + fn test_print_batch_with_same_widths() { + let batch = three_column_batch(); + let widths = vec![1, 1, 1]; + let mut writer = Vec::new(); + let format = PrintFormat::Table; + format + .print_batch_with_widths(&batch, &widths, &mut writer) + .unwrap(); + let expected = &["| 1 | 4 | 7 |", "| 2 | 5 | 8 |", "| 3 | 6 | 9 |"]; + let binding = String::from_utf8(writer.clone()).unwrap(); + let actual: Vec<_> = binding.trim_end().split('\n').collect(); + assert_eq!(actual, expected); + } + + #[test] + fn test_print_batch_with_different_widths() { + let batch = three_column_batch_with_widths(); + let widths = vec![7, 5, 6]; + let mut writer = Vec::new(); + let format = PrintFormat::Table; + format + .print_batch_with_widths(&batch, &widths, &mut writer) + .unwrap(); + let expected = &[ + "| 1 | 42222 | 7 |", + "| 2222222 | 5 | 8 |", + "| 3 | 6 | 922222 |", + ]; + let binding = String::from_utf8(writer.clone()).unwrap(); + let actual: Vec<_> = binding.trim_end().split('\n').collect(); + assert_eq!(actual, expected); + } + + #[test] + fn test_print_dotted_line() { + let widths = vec![1, 1, 1]; + let mut writer = Vec::new(); + let format = PrintFormat::Table; + format.print_dotted_line(&widths, &mut writer).unwrap(); + let expected = &["| . | . | . |"]; + let binding = String::from_utf8(writer.clone()).unwrap(); + let actual: Vec<_> = binding.trim_end().split('\n').collect(); + assert_eq!(actual, expected); + } + + #[test] + fn test_print_bottom_border() { + let widths = vec![1, 1, 1]; + let mut writer = Vec::new(); + let format = PrintFormat::Table; + format.print_bottom_border(&widths, &mut writer).unwrap(); + let expected = &["+---+---+---+"]; + let binding = String::from_utf8(writer.clone()).unwrap(); + let actual: Vec<_> = binding.trim_end().split('\n').collect(); + assert_eq!(actual, expected); + } + + #[test] + fn test_print_batches_with_maxrows() { + let batch = one_column_batch(); + let schema = one_column_schema(); + let format = PrintFormat::Table; + + // should print out entire output with no truncation if unlimited or + // limit greater than number of batches or equal to the number of batches + for max_rows in [MaxRows::Unlimited, MaxRows::Limited(5), MaxRows::Limited(3)] { + let mut writer = Vec::new(); + format + .print_batches( + &mut writer, + schema.clone(), + &[batch.clone()], + max_rows, + true, + ) + .unwrap(); + let expected = &[ + "+---+", "| a |", "+---+", "| 1 |", "| 2 |", "| 3 |", "+---+", + ]; + let binding = String::from_utf8(writer.clone()).unwrap(); + let actual: Vec<_> = binding.trim_end().split('\n').collect(); + assert_eq!(actual, expected); + } + + // should truncate output if limit is less than number of batches + let mut writer = Vec::new(); + format + .print_batches( + &mut writer, + schema.clone(), + &[batch.clone()], + MaxRows::Limited(1), + true, + ) + .unwrap(); + let expected = &[ + "+---+", "| a |", "+---+", "| 1 |", "| . |", "| . |", "| . |", "+---+", + ]; + let binding = String::from_utf8(writer.clone()).unwrap(); + let actual: Vec<_> = binding.trim_end().split('\n').collect(); + assert_eq!(actual, expected); + } + + // test print_batch with different batch widths + // and preview count is less than the first batch + #[test] + fn test_print_batches_with_preview_count_less_than_first_batch() { + let batch = three_column_batch_with_widths(); + let schema = three_column_schema(); + let format = PrintFormat::Table; + let preview_limit = 2; + let mut preview_batches = Vec::new(); + let mut preview_row_count = 0; + let mut precomputed_widths = None; + let mut header_printed = false; + let mut writer = Vec::new(); + + format + .process_batch( + &batch, + schema.clone(), + &mut preview_batches, + &mut preview_row_count, + preview_limit, + &mut precomputed_widths, + &mut header_printed, + &mut writer, + ) + .unwrap(); + + let expected = &[ + "+---------+-------+--------+", + "| a | b | c |", + "+---------+-------+--------+", + "| 1 | 42222 | 7 |", + "| 2222222 | 5 | 8 |", + "| 3 | 6 | 922222 |", + ]; + let binding = String::from_utf8(writer.clone()).unwrap(); + let actual: Vec<_> = binding.trim_end().split('\n').collect(); + assert_eq!(actual, expected); + } + + #[test] + fn test_print_batches_with_preview_and_later_batches() { + let batch1 = three_column_batch(); + let batch2 = three_column_batch_with_widths(); + let schema = three_column_schema(); + let format = PrintFormat::Table; + // preview limit is less than the first batch + // so the second batch if it's width is greater than the first batch, it will be unformatted + let preview_limit = 2; + let mut preview_batches = Vec::new(); + let mut preview_row_count = 0; + let mut precomputed_widths = None; + let mut header_printed = false; + let mut writer = Vec::new(); + + format + .process_batch( + &batch1, + schema.clone(), + &mut preview_batches, + &mut preview_row_count, + preview_limit, + &mut precomputed_widths, + &mut header_printed, + &mut writer, + ) + .unwrap(); + + format + .process_batch( + &batch2, + schema.clone(), + &mut preview_batches, + &mut preview_row_count, + preview_limit, + &mut precomputed_widths, + &mut header_printed, + &mut writer, + ) + .unwrap(); + + format + .process_batch( + &batch1, + schema.clone(), + &mut preview_batches, + &mut preview_row_count, + preview_limit, + &mut precomputed_widths, + &mut header_printed, + &mut writer, + ) + .unwrap(); + + let expected = &[ + "+---+---+---+", + "| a | b | c |", + "+---+---+---+", + "| 1 | 4 | 7 |", + "| 2 | 5 | 8 |", + "| 3 | 6 | 9 |", + "| 1 | 42222 | 7 |", + "| 2222222 | 5 | 8 |", + "| 3 | 6 | 922222 |", + "| 1 | 4 | 7 |", + "| 2 | 5 | 8 |", + "| 3 | 6 | 9 |", + ]; + let binding = String::from_utf8(writer.clone()).unwrap(); + let actual: Vec<_> = binding.trim_end().split('\n').collect(); + assert_eq!(actual, expected); + } + + #[test] + fn test_print_batches_with_preview_cover_later_batches() { + let batch1 = three_column_batch(); + let batch2 = three_column_batch_with_widths(); + let schema = three_column_schema(); + let format = PrintFormat::Table; + // preview limit is greater than the first batch + let preview_limit = 4; + let mut preview_batches = Vec::new(); + let mut preview_row_count = 0; + let mut precomputed_widths = None; + let mut header_printed = false; + let mut writer = Vec::new(); + + format + .process_batch( + &batch1, + schema.clone(), + &mut preview_batches, + &mut preview_row_count, + preview_limit, + &mut precomputed_widths, + &mut header_printed, + &mut writer, + ) + .unwrap(); + + format + .process_batch( + &batch2, + schema.clone(), + &mut preview_batches, + &mut preview_row_count, + preview_limit, + &mut precomputed_widths, + &mut header_printed, + &mut writer, + ) + .unwrap(); + + format + .process_batch( + &batch1, + schema.clone(), + &mut preview_batches, + &mut preview_row_count, + preview_limit, + &mut precomputed_widths, + &mut header_printed, + &mut writer, + ) + .unwrap(); + + let expected = &[ + "+---------+-------+--------+", + "| a | b | c |", + "+---------+-------+--------+", + "| 1 | 4 | 7 |", + "| 2 | 5 | 8 |", + "| 3 | 6 | 9 |", + "| 1 | 42222 | 7 |", + "| 2222222 | 5 | 8 |", + "| 3 | 6 | 922222 |", + "| 1 | 4 | 7 |", + "| 2 | 5 | 8 |", + "| 3 | 6 | 9 |", + ]; + let binding = String::from_utf8(writer.clone()).unwrap(); + let actual: Vec<_> = binding.trim_end().split('\n').collect(); + assert_eq!(actual, expected); + } + #[derive(Debug)] struct PrintBatchesTest { format: PrintFormat, @@ -672,6 +1136,19 @@ mod tests { .unwrap() } + /// Return a batch with three columns and three rows, but with different widths + fn three_column_batch_with_widths() -> RecordBatch { + RecordBatch::try_new( + three_column_schema(), + vec![ + Arc::new(Int32Array::from(vec![1, 2222222, 3])), + Arc::new(Int32Array::from(vec![42222, 5, 6])), + Arc::new(Int32Array::from(vec![7, 8, 922222])), + ], + ) + .unwrap() + } + /// Return a schema with one column fn one_column_schema() -> SchemaRef { Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])) diff --git a/datafusion-cli/src/print_options.rs b/datafusion-cli/src/print_options.rs index 9557e783e8a7..092483faed63 100644 --- a/datafusion-cli/src/print_options.rs +++ b/datafusion-cli/src/print_options.rs @@ -29,6 +29,7 @@ use datafusion::common::DataFusionError; use datafusion::error::Result; use datafusion::physical_plan::RecordBatchStream; +use datafusion::execution::SendableRecordBatchStream; use futures::StreamExt; #[derive(Debug, Clone, PartialEq, Copy)] @@ -74,27 +75,6 @@ pub struct PrintOptions { pub color: bool, } -// Returns the query execution details formatted -fn get_execution_details_formatted( - row_count: usize, - maxrows: MaxRows, - query_start_time: Instant, -) -> String { - let nrows_shown_msg = match maxrows { - MaxRows::Limited(nrows) if nrows < row_count => { - format!("(First {nrows} displayed. Use --maxrows to adjust)") - } - _ => String::new(), - }; - - format!( - "{} row(s) fetched. {}\nElapsed {:.3} seconds.\n", - row_count, - nrows_shown_msg, - query_start_time.elapsed().as_secs_f64() - ) -} - impl PrintOptions { /// Print the batches to stdout using the specified format pub fn print_batches( @@ -110,7 +90,7 @@ impl PrintOptions { self.format .print_batches(&mut writer, schema, batches, self.maxrows, true)?; - let formatted_exec_details = get_execution_details_formatted( + let formatted_exec_details = self.get_execution_details_formatted( row_count, if self.format == PrintFormat::Table { self.maxrows @@ -127,9 +107,119 @@ impl PrintOptions { Ok(()) } + pub async fn print_table_batch( + &self, + print_options: &PrintOptions, + schema: SchemaRef, + stream: &mut SendableRecordBatchStream, + max_rows: usize, + writer: &mut dyn std::io::Write, + now: Instant, + ) -> Result<()> { + let preview_limit: usize = 1000; + let mut preview_batches: Vec = vec![]; + let mut preview_row_count = 0_usize; + let mut total_count = 0_usize; + let mut precomputed_widths: Option> = None; + let mut header_printed = false; + let mut max_rows_reached = false; + + while let Some(batch) = stream.next().await { + let batch = batch?; + let batch_rows = batch.num_rows(); + + if !max_rows_reached && total_count < max_rows { + if total_count + batch_rows > max_rows { + let needed = max_rows - total_count; + let batch_to_print = batch.slice(0, needed); + print_options.format.process_batch( + &batch_to_print, + schema.clone(), + &mut preview_batches, + &mut preview_row_count, + preview_limit, + &mut precomputed_widths, + &mut header_printed, + writer, + )?; + if precomputed_widths.is_none() { + let widths = print_options + .format + .compute_column_widths(&preview_batches, schema.clone())?; + precomputed_widths = Some(widths.clone()); + if !header_printed { + print_options + .format + .print_header(&schema, &widths, writer)?; + header_printed = true; + } + for preview_batch in preview_batches.drain(..) { + print_options.format.print_batch_with_widths( + &preview_batch, + &widths, + writer, + )?; + } + } + if let Some(ref widths) = precomputed_widths { + for _ in 0..3 { + print_options.format.print_dotted_line(widths, writer)?; + } + print_options.format.print_bottom_border(widths, writer)?; + } + max_rows_reached = true; + } else { + print_options.format.process_batch( + &batch, + schema.clone(), + &mut preview_batches, + &mut preview_row_count, + preview_limit, + &mut precomputed_widths, + &mut header_printed, + writer, + )?; + } + } + + total_count += batch_rows; + } + + if !max_rows_reached { + if precomputed_widths.is_none() && !preview_batches.is_empty() { + let widths = print_options + .format + .compute_column_widths(&preview_batches, schema.clone())?; + precomputed_widths = Some(widths); + if !header_printed { + print_options.format.print_header( + &schema, + precomputed_widths.as_ref().unwrap(), + writer, + )?; + } + } + if let Some(ref widths) = precomputed_widths { + print_options.format.print_bottom_border(widths, writer)?; + } + } + + let formatted_exec_details = print_options.get_execution_details_formatted( + total_count, + print_options.maxrows, + now, + ); + if !print_options.quiet { + writeln!(writer, "{}", formatted_exec_details)?; + } + + Ok(()) + } + /// Print the stream to stdout using the specified format pub async fn print_stream( &self, + max_rows: MaxRows, mut stream: Pin>, query_start_time: Instant, ) -> Result<()> { @@ -139,30 +229,49 @@ impl PrintOptions { )); }; + let max_count = match self.maxrows { + MaxRows::Unlimited => usize::MAX, + MaxRows::Limited(n) => n, + }; + let stdout = std::io::stdout(); let mut writer = stdout.lock(); let mut row_count = 0_usize; let mut with_header = true; + let mut max_rows_reached = false; while let Some(maybe_batch) = stream.next().await { let batch = maybe_batch?; - row_count += batch.num_rows(); - self.format.print_batches( - &mut writer, - batch.schema(), - &[batch], - MaxRows::Unlimited, - with_header, - )?; + let curr_batch_rows = batch.num_rows(); + if !max_rows_reached && row_count < max_count { + if row_count + curr_batch_rows > max_count { + let needed = max_count - row_count; + let batch_to_print = batch.slice(0, needed); + self.format.print_batches( + &mut writer, + batch.schema(), + &[batch_to_print], + max_rows, + with_header, + )?; + max_rows_reached = true; + } else { + self.format.print_batches( + &mut writer, + batch.schema(), + &[batch], + max_rows, + with_header, + )?; + } + } + row_count += curr_batch_rows; with_header = false; } - let formatted_exec_details = get_execution_details_formatted( - row_count, - MaxRows::Unlimited, - query_start_time, - ); + let formatted_exec_details = + self.get_execution_details_formatted(row_count, max_rows, query_start_time); if !self.quiet { writeln!(writer, "{formatted_exec_details}")?; @@ -170,4 +279,26 @@ impl PrintOptions { Ok(()) } + + // Returns the query execution details formatted + pub fn get_execution_details_formatted( + &self, + row_count: usize, + maxrows: MaxRows, + query_start_time: Instant, + ) -> String { + let nrows_shown_msg = match maxrows { + MaxRows::Limited(nrows) if nrows < row_count => { + format!("(First {nrows} displayed. Use --maxrows to adjust)") + } + _ => String::new(), + }; + + format!( + "{} row(s) fetched. {}\nElapsed {:.3} seconds.\n", + row_count, + nrows_shown_msg, + query_start_time.elapsed().as_secs_f64() + ) + } }