Skip to content

Commit

Permalink
Implement Support for Copy To Logical and Physical plans (#7283)
Browse files Browse the repository at this point in the history
* rebase

* maybe windows fix

* rebase add explain copy tests

* rebase and fix pipeline
  • Loading branch information
devinjdangelo authored Aug 16, 2023
1 parent 3da9192 commit 7d77448
Show file tree
Hide file tree
Showing 27 changed files with 756 additions and 143 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,7 @@ datafusion/CHANGELOG.md.bak
.githubchangeloggenerator.cache*

# Generated tpch data
datafusion/core/tests/sqllogictests/test_files/tpch/data/*
datafusion/sqllogictests/test_files/tpch/data/*

# Scratch temp dir for sqllogictests
datafusion/sqllogictest/test_files/scratch*
55 changes: 41 additions & 14 deletions datafusion/core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use arrow::compute::{cast, concat};
use arrow::datatypes::{DataType, Field};
use async_trait::async_trait;
use datafusion_common::{DataFusionError, SchemaError, UnnestOptions};
use datafusion_expr::dml::OutputFileFormat;
use parquet::file::properties::WriterProperties;

use datafusion_common::{Column, DFSchema, ScalarValue};
Expand All @@ -37,7 +38,6 @@ use crate::arrow::datatypes::Schema;
use crate::arrow::datatypes::SchemaRef;
use crate::arrow::record_batch::RecordBatch;
use crate::arrow::util::pretty;
use crate::datasource::physical_plan::{plan_to_csv, plan_to_json, plan_to_parquet};
use crate::datasource::{provider_as_source, MemTable, TableProvider};
use crate::error::Result;
use crate::execution::{
Expand Down Expand Up @@ -992,28 +992,55 @@ impl DataFrame {
}

/// Write a `DataFrame` to a CSV file.
pub async fn write_csv(self, path: &str) -> Result<()> {
let plan = self.session_state.create_physical_plan(&self.plan).await?;
let task_ctx = Arc::new(self.task_ctx());
plan_to_csv(task_ctx, plan, path).await
pub async fn write_csv(
self,
path: &str,
) -> Result<Vec<RecordBatch>, DataFusionError> {
let plan = LogicalPlanBuilder::copy_to(
self.plan,
path.into(),
OutputFileFormat::CSV,
true,
// TODO implement options
vec![],
)?
.build()?;
DataFrame::new(self.session_state, plan).collect().await
}

/// Write a `DataFrame` to a Parquet file.
pub async fn write_parquet(
self,
path: &str,
writer_properties: Option<WriterProperties>,
) -> Result<()> {
let plan = self.session_state.create_physical_plan(&self.plan).await?;
let task_ctx = Arc::new(self.task_ctx());
plan_to_parquet(task_ctx, plan, path, writer_properties).await
_writer_properties: Option<WriterProperties>,
) -> Result<Vec<RecordBatch>, DataFusionError> {
let plan = LogicalPlanBuilder::copy_to(
self.plan,
path.into(),
OutputFileFormat::PARQUET,
true,
// TODO implement options
vec![],
)?
.build()?;
DataFrame::new(self.session_state, plan).collect().await
}

/// Executes a query and writes the results to a partitioned JSON file.
pub async fn write_json(self, path: impl AsRef<str>) -> Result<()> {
let plan = self.session_state.create_physical_plan(&self.plan).await?;
let task_ctx = Arc::new(self.task_ctx());
plan_to_json(task_ctx, plan, path).await
pub async fn write_json(
self,
path: &str,
) -> Result<Vec<RecordBatch>, DataFusionError> {
let plan = LogicalPlanBuilder::copy_to(
self.plan,
path.into(),
OutputFileFormat::JSON,
true,
// TODO implement options
vec![],
)?
.build()?;
DataFrame::new(self.session_state, plan).collect().await
}

/// Add an additional column to the DataFrame.
Expand Down
99 changes: 68 additions & 31 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use crate::datasource::physical_plan::{
};
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::insert::{DataSink, InsertExec};
use crate::physical_plan::insert::{DataSink, FileSinkExec};
use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics};
use crate::physical_plan::{ExecutionPlan, SendableRecordBatchStream};
use rand::distributions::{Alphanumeric, DistString};
Expand Down Expand Up @@ -277,6 +277,7 @@ impl FileFormat for CsvFormat {
"Inserting compressed CSV is not implemented yet.".into(),
));
}

let sink_schema = conf.output_schema().clone();
let sink = Arc::new(CsvSink::new(
conf,
Expand All @@ -285,7 +286,7 @@ impl FileFormat for CsvFormat {
self.file_compression_type,
));

Ok(Arc::new(InsertExec::new(input, sink, sink_schema)) as _)
Ok(Arc::new(FileSinkExec::new(input, sink, sink_schema)) as _)
}
}

Expand Down Expand Up @@ -505,12 +506,14 @@ impl DataSink for CsvSink {
let object_store = context
.runtime_env()
.object_store(&self.config.object_store_url)?;

// Construct serializer and writer for each file group
let mut serializers: Vec<Box<dyn BatchSerializer>> = vec![];
let mut writers = vec![];
match self.config.writer_mode {
FileWriterMode::Append => {
if !self.config.per_thread_output {
return Err(DataFusionError::NotImplemented("per_thread_output=false is not implemented for CsvSink in Append mode".into()));
}
for file_group in &self.config.file_groups {
// In append mode, consider has_header flag only when file is empty (at the start).
// For other modes, use has_header flag as is.
Expand Down Expand Up @@ -542,38 +545,72 @@ impl DataSink for CsvSink {
FileWriterMode::PutMultipart => {
// Currently assuming only 1 partition path (i.e. not hive-style partitioning on a column)
let base_path = &self.config.table_paths[0];
// Uniquely identify this batch of files with a random string, to prevent collisions overwriting files
let write_id = Alphanumeric.sample_string(&mut rand::thread_rng(), 16);
for part_idx in 0..num_partitions {
let header = self.has_header;
let builder = WriterBuilder::new().with_delimiter(self.delimiter);
let serializer = CsvSerializer::new()
.with_builder(builder)
.with_header(header);
let file_path = base_path
.prefix()
.child(format!("/{}_{}.csv", write_id, part_idx));
let object_meta = ObjectMeta {
location: file_path,
last_modified: chrono::offset::Utc::now(),
size: 0,
e_tag: None,
};
let writer = create_writer(
self.config.writer_mode,
self.file_compression_type,
object_meta.into(),
object_store.clone(),
)
.await?;

serializers.push(Box::new(serializer));
writers.push(writer);
match self.config.per_thread_output {
true => {
// Uniquely identify this batch of files with a random string, to prevent collisions overwriting files
let write_id =
Alphanumeric.sample_string(&mut rand::thread_rng(), 16);
for part_idx in 0..num_partitions {
let header = self.has_header;
let builder =
WriterBuilder::new().with_delimiter(self.delimiter);
let serializer = CsvSerializer::new()
.with_builder(builder)
.with_header(header);
serializers.push(Box::new(serializer));
let file_path = base_path
.prefix()
.child(format!("{}_{}.csv", write_id, part_idx));
let object_meta = ObjectMeta {
location: file_path,
last_modified: chrono::offset::Utc::now(),
size: 0,
e_tag: None,
};
let writer = create_writer(
self.config.writer_mode,
self.file_compression_type,
object_meta.into(),
object_store.clone(),
)
.await?;
writers.push(writer);
}
}
false => {
let header = self.has_header;
let builder = WriterBuilder::new().with_delimiter(self.delimiter);
let serializer = CsvSerializer::new()
.with_builder(builder)
.with_header(header);
serializers.push(Box::new(serializer));
let file_path = base_path.prefix();
let object_meta = ObjectMeta {
location: file_path.clone(),
last_modified: chrono::offset::Utc::now(),
size: 0,
e_tag: None,
};
let writer = create_writer(
self.config.writer_mode,
self.file_compression_type,
object_meta.into(),
object_store.clone(),
)
.await?;
writers.push(writer);
}
}
}
}

stateless_serialize_and_write_files(data, serializers, writers).await
stateless_serialize_and_write_files(
data,
serializers,
writers,
self.config.per_thread_output,
)
.await
}
}

Expand Down
83 changes: 58 additions & 25 deletions datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use object_store::{GetResult, ObjectMeta, ObjectStore};

use crate::datasource::physical_plan::FileGroupDisplay;
use crate::physical_plan::insert::DataSink;
use crate::physical_plan::insert::InsertExec;
use crate::physical_plan::insert::FileSinkExec;
use crate::physical_plan::SendableRecordBatchStream;
use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics};

Expand Down Expand Up @@ -187,7 +187,7 @@ impl FileFormat for JsonFormat {
let sink_schema = conf.output_schema().clone();
let sink = Arc::new(JsonSink::new(conf, self.file_compression_type));

Ok(Arc::new(InsertExec::new(input, sink, sink_schema)) as _)
Ok(Arc::new(FileSinkExec::new(input, sink, sink_schema)) as _)
}
}

Expand Down Expand Up @@ -280,6 +280,9 @@ impl DataSink for JsonSink {
let mut writers = vec![];
match self.config.writer_mode {
FileWriterMode::Append => {
if !self.config.per_thread_output {
return Err(DataFusionError::NotImplemented("per_thread_output=false is not implemented for JsonSink in Append mode".into()));
}
for file_group in &self.config.file_groups {
let serializer = JsonSerializer::new();
serializers.push(Box::new(serializer));
Expand All @@ -303,33 +306,63 @@ impl DataSink for JsonSink {
FileWriterMode::PutMultipart => {
// Currently assuming only 1 partition path (i.e. not hive-style partitioning on a column)
let base_path = &self.config.table_paths[0];
// Uniquely identify this batch of files with a random string, to prevent collisions overwriting files
let write_id = Alphanumeric.sample_string(&mut rand::thread_rng(), 16);
for part_idx in 0..num_partitions {
let serializer = JsonSerializer::new();
serializers.push(Box::new(serializer));
let file_path = base_path
.prefix()
.child(format!("/{}_{}.json", write_id, part_idx));
let object_meta = ObjectMeta {
location: file_path,
last_modified: chrono::offset::Utc::now(),
size: 0,
e_tag: None,
};
let writer = create_writer(
self.config.writer_mode,
self.file_compression_type,
object_meta.into(),
object_store.clone(),
)
.await?;
writers.push(writer);
match self.config.per_thread_output {
true => {
// Uniquely identify this batch of files with a random string, to prevent collisions overwriting files
let write_id =
Alphanumeric.sample_string(&mut rand::thread_rng(), 16);
for part_idx in 0..num_partitions {
let serializer = JsonSerializer::new();
serializers.push(Box::new(serializer));
let file_path = base_path
.prefix()
.child(format!("{}_{}.json", write_id, part_idx));
let object_meta = ObjectMeta {
location: file_path,
last_modified: chrono::offset::Utc::now(),
size: 0,
e_tag: None,
};
let writer = create_writer(
self.config.writer_mode,
self.file_compression_type,
object_meta.into(),
object_store.clone(),
)
.await?;
writers.push(writer);
}
}
false => {
let serializer = JsonSerializer::new();
serializers.push(Box::new(serializer));
let file_path = base_path.prefix();
let object_meta = ObjectMeta {
location: file_path.clone(),
last_modified: chrono::offset::Utc::now(),
size: 0,
e_tag: None,
};
let writer = create_writer(
self.config.writer_mode,
self.file_compression_type,
object_meta.into(),
object_store.clone(),
)
.await?;
writers.push(writer);
}
}
}
}

stateless_serialize_and_write_files(data, serializers, writers).await
stateless_serialize_and_write_files(
data,
serializers,
writers,
self.config.per_thread_output,
)
.await
}
}

Expand Down
Loading

0 comments on commit 7d77448

Please sign in to comment.