diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index e5f209783a86..bb985e6ea026 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -730,6 +730,7 @@ message PhysicalPlanNode { CooperativeExecNode cooperative = 32; GenerateSeriesNode generate_series = 33; SortMergeJoinExecNode sort_merge_join = 34; + MemoryScanExecNode memory_scan = 35; } } @@ -1041,6 +1042,15 @@ message AvroScanExecNode { FileScanExecConf base_conf = 1; } +message MemoryScanExecNode { + repeated bytes partitions = 1; + datafusion_common.Schema schema = 2; + repeated uint32 projection = 3; + repeated PhysicalSortExprNodeCollection sort_information = 4; + bool show_sizes = 5; + optional uint32 fetch = 6; +} + message CooperativeExecNode { PhysicalPlanNode input = 1; } diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 5fbfdd5f5422..2ddf063ee878 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -12624,6 +12624,192 @@ impl<'de> serde::Deserialize<'de> for MaybePhysicalSortExprs { deserializer.deserialize_struct("datafusion.MaybePhysicalSortExprs", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for MemoryScanExecNode { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if !self.partitions.is_empty() { + len += 1; + } + if self.schema.is_some() { + len += 1; + } + if !self.projection.is_empty() { + len += 1; + } + if !self.sort_information.is_empty() { + len += 1; + } + if self.show_sizes { + len += 1; + } + if self.fetch.is_some() { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.MemoryScanExecNode", len)?; + if !self.partitions.is_empty() { + struct_ser.serialize_field("partitions", &self.partitions.iter().map(pbjson::private::base64::encode).collect::>())?; + } + if let Some(v) = self.schema.as_ref() { + struct_ser.serialize_field("schema", v)?; + } + if !self.projection.is_empty() { + struct_ser.serialize_field("projection", &self.projection)?; + } + if !self.sort_information.is_empty() { + struct_ser.serialize_field("sortInformation", &self.sort_information)?; + } + if self.show_sizes { + struct_ser.serialize_field("showSizes", &self.show_sizes)?; + } + if let Some(v) = self.fetch.as_ref() { + struct_ser.serialize_field("fetch", v)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for MemoryScanExecNode { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "partitions", + "schema", + "projection", + "sort_information", + "sortInformation", + "show_sizes", + "showSizes", + "fetch", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + Partitions, + Schema, + Projection, + SortInformation, + ShowSizes, + Fetch, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "partitions" => Ok(GeneratedField::Partitions), + "schema" => Ok(GeneratedField::Schema), + "projection" => Ok(GeneratedField::Projection), + "sortInformation" | "sort_information" => Ok(GeneratedField::SortInformation), + "showSizes" | "show_sizes" => Ok(GeneratedField::ShowSizes), + "fetch" => Ok(GeneratedField::Fetch), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = MemoryScanExecNode; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion.MemoryScanExecNode") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut partitions__ = None; + let mut schema__ = None; + let mut projection__ = None; + let mut sort_information__ = None; + let mut show_sizes__ = None; + let mut fetch__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::Partitions => { + if partitions__.is_some() { + return Err(serde::de::Error::duplicate_field("partitions")); + } + partitions__ = + Some(map_.next_value::>>()? + .into_iter().map(|x| x.0).collect()) + ; + } + GeneratedField::Schema => { + if schema__.is_some() { + return Err(serde::de::Error::duplicate_field("schema")); + } + schema__ = map_.next_value()?; + } + GeneratedField::Projection => { + if projection__.is_some() { + return Err(serde::de::Error::duplicate_field("projection")); + } + projection__ = + Some(map_.next_value::>>()? + .into_iter().map(|x| x.0).collect()) + ; + } + GeneratedField::SortInformation => { + if sort_information__.is_some() { + return Err(serde::de::Error::duplicate_field("sortInformation")); + } + sort_information__ = Some(map_.next_value()?); + } + GeneratedField::ShowSizes => { + if show_sizes__.is_some() { + return Err(serde::de::Error::duplicate_field("showSizes")); + } + show_sizes__ = Some(map_.next_value()?); + } + GeneratedField::Fetch => { + if fetch__.is_some() { + return Err(serde::de::Error::duplicate_field("fetch")); + } + fetch__ = + map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| x.0) + ; + } + } + } + Ok(MemoryScanExecNode { + partitions: partitions__.unwrap_or_default(), + schema: schema__, + projection: projection__.unwrap_or_default(), + sort_information: sort_information__.unwrap_or_default(), + show_sizes: show_sizes__.unwrap_or_default(), + fetch: fetch__, + }) + } + } + deserializer.deserialize_struct("datafusion.MemoryScanExecNode", FIELDS, GeneratedVisitor) + } +} impl serde::Serialize for NamedStructField { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result @@ -16802,6 +16988,9 @@ impl serde::Serialize for PhysicalPlanNode { physical_plan_node::PhysicalPlanType::SortMergeJoin(v) => { struct_ser.serialize_field("sortMergeJoin", v)?; } + physical_plan_node::PhysicalPlanType::MemoryScan(v) => { + struct_ser.serialize_field("memoryScan", v)?; + } } } struct_ser.end() @@ -16865,6 +17054,8 @@ impl<'de> serde::Deserialize<'de> for PhysicalPlanNode { "generateSeries", "sort_merge_join", "sortMergeJoin", + "memory_scan", + "memoryScan", ]; #[allow(clippy::enum_variant_names)] @@ -16902,6 +17093,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalPlanNode { Cooperative, GenerateSeries, SortMergeJoin, + MemoryScan, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -16956,6 +17148,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalPlanNode { "cooperative" => Ok(GeneratedField::Cooperative), "generateSeries" | "generate_series" => Ok(GeneratedField::GenerateSeries), "sortMergeJoin" | "sort_merge_join" => Ok(GeneratedField::SortMergeJoin), + "memoryScan" | "memory_scan" => Ok(GeneratedField::MemoryScan), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -17207,6 +17400,13 @@ impl<'de> serde::Deserialize<'de> for PhysicalPlanNode { return Err(serde::de::Error::duplicate_field("sortMergeJoin")); } physical_plan_type__ = map_.next_value::<::std::option::Option<_>>()?.map(physical_plan_node::PhysicalPlanType::SortMergeJoin) +; + } + GeneratedField::MemoryScan => { + if physical_plan_type__.is_some() { + return Err(serde::de::Error::duplicate_field("memoryScan")); + } + physical_plan_type__ = map_.next_value::<::std::option::Option<_>>()?.map(physical_plan_node::PhysicalPlanType::MemoryScan) ; } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 65c807e816b5..69f7542e48c9 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1053,7 +1053,7 @@ pub mod table_reference { pub struct PhysicalPlanNode { #[prost( oneof = "physical_plan_node::PhysicalPlanType", - tags = "1, 2, 3, 4, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34" + tags = "1, 2, 3, 4, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35" )] pub physical_plan_type: ::core::option::Option, } @@ -1129,6 +1129,8 @@ pub mod physical_plan_node { GenerateSeries(super::GenerateSeriesNode), #[prost(message, tag = "34")] SortMergeJoin(::prost::alloc::boxed::Box), + #[prost(message, tag = "35")] + MemoryScan(super::MemoryScanExecNode), } } #[derive(Clone, PartialEq, ::prost::Message)] @@ -1591,6 +1593,21 @@ pub struct AvroScanExecNode { pub base_conf: ::core::option::Option, } #[derive(Clone, PartialEq, ::prost::Message)] +pub struct MemoryScanExecNode { + #[prost(bytes = "vec", repeated, tag = "1")] + pub partitions: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec>, + #[prost(message, optional, tag = "2")] + pub schema: ::core::option::Option, + #[prost(uint32, repeated, tag = "3")] + pub projection: ::prost::alloc::vec::Vec, + #[prost(message, repeated, tag = "4")] + pub sort_information: ::prost::alloc::vec::Vec, + #[prost(bool, tag = "5")] + pub show_sizes: bool, + #[prost(uint32, optional, tag = "6")] + pub fetch: ::core::option::Option, +} +#[derive(Clone, PartialEq, ::prost::Message)] pub struct CooperativeExecNode { #[prost(message, optional, boxed, tag = "1")] pub input: ::core::option::Option<::prost::alloc::boxed::Box>, diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 39ad52d46a80..dec227d168bf 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -19,8 +19,10 @@ use std::sync::Arc; +use arrow::array::RecordBatch; use arrow::compute::SortOptions; use arrow::datatypes::Field; +use arrow::ipc::reader::StreamReader; use chrono::{TimeZone, Utc}; use datafusion_expr::dml::InsertOp; use object_store::path::Path; @@ -556,6 +558,18 @@ pub fn parse_protobuf_file_scan_config( Ok(config) } +pub fn parse_record_batches(buf: &[u8]) -> Result> { + if buf.is_empty() { + return Ok(vec![]); + } + let reader = StreamReader::try_new(buf, None)?; + let mut batches = Vec::new(); + for batch in reader { + batches.push(batch?); + } + Ok(batches) +} + impl TryFrom<&protobuf::PartitionedFile> for PartitionedFile { type Error = DataFusionError; diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 1a1b369fabee..38dd2fdbf537 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -24,11 +24,12 @@ use crate::common::{byte_to_string, str_to_byte}; use crate::physical_plan::from_proto::{ parse_physical_expr, parse_physical_sort_expr, parse_physical_sort_exprs, parse_physical_window_expr, parse_protobuf_file_scan_config, - parse_protobuf_file_scan_schema, + parse_protobuf_file_scan_schema, parse_record_batches, }; use crate::physical_plan::to_proto::{ serialize_file_scan_config, serialize_maybe_filter, serialize_physical_aggr_expr, - serialize_physical_window_expr, + serialize_physical_sort_exprs, serialize_physical_window_expr, + serialize_record_batches, }; use crate::protobuf::physical_aggregate_expr_node::AggregateFunction; use crate::protobuf::physical_expr_node::ExprType; @@ -41,6 +42,7 @@ use crate::{convert_required, into_required}; use datafusion::arrow::compute::SortOptions; use datafusion::arrow::datatypes::{IntervalMonthDayNanoType, Schema, SchemaRef}; +use datafusion::catalog::memory::MemorySourceConfig; use datafusion::datasource::file_format::csv::CsvSink; use datafusion::datasource::file_format::file_compression_type::FileCompressionType; use datafusion::datasource::file_format::json::JsonSink; @@ -54,7 +56,7 @@ use datafusion::datasource::physical_plan::{ CsvSource, FileScanConfig, FileScanConfigBuilder, JsonSource, }; use datafusion::datasource::sink::DataSinkExec; -use datafusion::datasource::source::DataSourceExec; +use datafusion::datasource::source::{DataSource, DataSourceExec}; use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::execution::FunctionRegistry; use datafusion::functions_table::generate_series::{ @@ -164,6 +166,8 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { PhysicalPlanType::AvroScan(scan) => { self.try_into_avro_scan_physical_plan(scan, ctx, runtime, extension_codec) } + PhysicalPlanType::MemoryScan(scan) => self + .try_into_memory_scan_physical_plan(scan, ctx, runtime, extension_codec), PhysicalPlanType::CoalesceBatches(coalesce_batches) => self .try_into_coalesce_batches_physical_plan( coalesce_batches, @@ -779,6 +783,57 @@ impl protobuf::PhysicalPlanNode { panic!("Unable to process a Avro PhysicalPlan when `avro` feature is not enabled") } + fn try_into_memory_scan_physical_plan( + &self, + scan: &protobuf::MemoryScanExecNode, + ctx: &SessionContext, + _runtime: &RuntimeEnv, + extension_codec: &dyn PhysicalExtensionCodec, + ) -> Result> { + let partitions = scan + .partitions + .iter() + .map(|p| parse_record_batches(p)) + .collect::>>()?; + + let proto_schema = scan.schema.as_ref().ok_or_else(|| { + DataFusionError::Internal( + "schema in MemoryScanExecNode is missing.".to_owned(), + ) + })?; + let schema: SchemaRef = SchemaRef::new(proto_schema.try_into()?); + + let projection = if !scan.projection.is_empty() { + Some( + scan.projection + .iter() + .map(|i| *i as usize) + .collect::>(), + ) + } else { + None + }; + + let mut sort_information = vec![]; + for ordering in &scan.sort_information { + let sort_exprs = parse_physical_sort_exprs( + &ordering.physical_sort_expr_nodes, + ctx, + &schema, + extension_codec, + )?; + sort_information.extend(LexOrdering::new(sort_exprs)); + } + + let source = MemorySourceConfig::try_new(&partitions, schema, projection)? + .with_limit(scan.fetch.map(|f| f as usize)) + .with_show_sizes(scan.show_sizes); + + let source = source.try_with_sort_information(sort_information)?; + + Ok(DataSourceExec::from_data_source(source)) + } + fn try_into_coalesce_batches_physical_plan( &self, coalesce_batches: &protobuf::CoalesceBatchesExecNode, @@ -2617,6 +2672,53 @@ impl protobuf::PhysicalPlanNode { } } + if let Some(source_conf) = + data_source.as_any().downcast_ref::() + { + let proto_partitions = source_conf + .partitions() + .iter() + .map(|p| serialize_record_batches(p)) + .collect::>>()?; + + let proto_schema: protobuf::Schema = + source_conf.original_schema().as_ref().try_into()?; + + let proto_projection = source_conf + .projection() + .as_ref() + .map_or_else(Vec::new, |v| { + v.iter().map(|x| *x as u32).collect::>() + }); + + let proto_sort_information = source_conf + .sort_information() + .iter() + .map(|ordering| { + let sort_exprs = serialize_physical_sort_exprs( + ordering.to_owned(), + extension_codec, + )?; + Ok::<_, DataFusionError>(protobuf::PhysicalSortExprNodeCollection { + physical_sort_expr_nodes: sort_exprs, + }) + }) + .collect::, _>>()?; + + return Ok(Some(protobuf::PhysicalPlanNode { + physical_plan_type: Some(PhysicalPlanType::MemoryScan( + protobuf::MemoryScanExecNode { + partitions: proto_partitions, + schema: Some(proto_schema), + projection: proto_projection, + sort_information: proto_sort_information, + show_sizes: source_conf.show_sizes(), + fetch: source_conf.fetch().map(|f| f as u32), + }, + )), + })); + } + Ok(None) } diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 64960e39f75d..11db88cacb0d 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -17,7 +17,9 @@ use std::sync::Arc; +use arrow::array::RecordBatch; use arrow::datatypes::Schema; +use arrow::ipc::writer::StreamWriter; #[cfg(feature = "parquet")] use datafusion::datasource::file_format::parquet::ParquetSink; use datafusion::datasource::physical_plan::FileSink; @@ -562,6 +564,20 @@ pub fn serialize_maybe_filter( } } +pub fn serialize_record_batches(batches: &[RecordBatch]) -> Result> { + if batches.is_empty() { + return Ok(vec![]); + } + let schema = batches[0].schema(); + let mut buf = Vec::new(); + let mut writer = StreamWriter::try_new(&mut buf, &schema)?; + for batch in batches { + writer.write(batch)?; + } + writer.finish()?; + Ok(buf) +} + impl TryFrom<&JsonSink> for protobuf::JsonSink { type Error = DataFusionError; diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 7ea4a5cb308c..208a6f97f66d 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -2206,6 +2206,16 @@ async fn roundtrip_logical_plan_sort_merge_join() -> Result<()> { let query = "SELECT t1.* FROM t0 join t1 on t0.a = t1.a"; let plan = ctx.sql(query).await?.create_physical_plan().await?; + roundtrip_test(plan) +} +#[tokio::test] +async fn roundtrip_memory_source() -> Result<()> { + let ctx = SessionContext::new(); + let plan = ctx + .sql("select * from values ('Tom', 18)") + .await? + .create_physical_plan() + .await?; roundtrip_test(plan) }