Skip to content

Commit 6fd5685

Browse files
lewiszlwalambtimsaucer
authored
Memory datasource protobuf support (#17290)
* Add proto * fix proto * gen proto code * exec to proto * gen proto code * impl (de)serialization * Add test * Update submodules * gen proto * Set parquet-testing back to main --------- Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> Co-authored-by: Tim Saucer <timsaucer@gmail.com>
1 parent 268e115 commit 6fd5685

File tree

7 files changed

+373
-4
lines changed

7 files changed

+373
-4
lines changed

datafusion/proto/proto/datafusion.proto

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -730,6 +730,7 @@ message PhysicalPlanNode {
730730
CooperativeExecNode cooperative = 32;
731731
GenerateSeriesNode generate_series = 33;
732732
SortMergeJoinExecNode sort_merge_join = 34;
733+
MemoryScanExecNode memory_scan = 35;
733734
}
734735
}
735736

@@ -1041,6 +1042,15 @@ message AvroScanExecNode {
10411042
FileScanExecConf base_conf = 1;
10421043
}
10431044

1045+
message MemoryScanExecNode {
1046+
repeated bytes partitions = 1;
1047+
datafusion_common.Schema schema = 2;
1048+
repeated uint32 projection = 3;
1049+
repeated PhysicalSortExprNodeCollection sort_information = 4;
1050+
bool show_sizes = 5;
1051+
optional uint32 fetch = 6;
1052+
}
1053+
10441054
message CooperativeExecNode {
10451055
PhysicalPlanNode input = 1;
10461056
}

datafusion/proto/src/generated/pbjson.rs

Lines changed: 200 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/generated/prost.rs

Lines changed: 18 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/physical_plan/from_proto.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@
1919
2020
use std::sync::Arc;
2121

22+
use arrow::array::RecordBatch;
2223
use arrow::compute::SortOptions;
2324
use arrow::datatypes::Field;
25+
use arrow::ipc::reader::StreamReader;
2426
use chrono::{TimeZone, Utc};
2527
use datafusion_expr::dml::InsertOp;
2628
use object_store::path::Path;
@@ -556,6 +558,18 @@ pub fn parse_protobuf_file_scan_config(
556558
Ok(config)
557559
}
558560

561+
pub fn parse_record_batches(buf: &[u8]) -> Result<Vec<RecordBatch>> {
562+
if buf.is_empty() {
563+
return Ok(vec![]);
564+
}
565+
let reader = StreamReader::try_new(buf, None)?;
566+
let mut batches = Vec::new();
567+
for batch in reader {
568+
batches.push(batch?);
569+
}
570+
Ok(batches)
571+
}
572+
559573
impl TryFrom<&protobuf::PartitionedFile> for PartitionedFile {
560574
type Error = DataFusionError;
561575

0 commit comments

Comments
 (0)