Skip to content

Commit 722ccb9

Browse files
authored
feat: Support serde for JsonSource PhysicalPlan (#15311)
1 parent dd9c3a8 commit 722ccb9

File tree

5 files changed

+158
-3
lines changed

5 files changed

+158
-3
lines changed

datafusion/proto/proto/datafusion.proto

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,6 @@ message DmlNode{
273273
INSERT_APPEND = 3;
274274
INSERT_OVERWRITE = 4;
275275
INSERT_REPLACE = 5;
276-
277276
}
278277
Type dml_type = 1;
279278
LogicalPlanNode input = 2;
@@ -726,6 +725,7 @@ message PhysicalPlanNode {
726725
CsvSinkExecNode csv_sink = 28;
727726
ParquetSinkExecNode parquet_sink = 29;
728727
UnnestExecNode unnest = 30;
728+
JsonScanExecNode json_scan = 31;
729729
}
730730
}
731731

@@ -1024,6 +1024,10 @@ message CsvScanExecNode {
10241024
bool newlines_in_values = 7;
10251025
}
10261026

1027+
message JsonScanExecNode {
1028+
FileScanExecConf base_conf = 1;
1029+
}
1030+
10271031
message AvroScanExecNode {
10281032
FileScanExecConf base_conf = 1;
10291033
}

datafusion/proto/src/generated/pbjson.rs

Lines changed: 106 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/generated/prost.rs

Lines changed: 8 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/physical_plan/mod.rs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use datafusion::datasource::file_format::parquet::ParquetSink;
3333
use datafusion::datasource::physical_plan::AvroSource;
3434
#[cfg(feature = "parquet")]
3535
use datafusion::datasource::physical_plan::ParquetSource;
36-
use datafusion::datasource::physical_plan::{CsvSource, FileScanConfig};
36+
use datafusion::datasource::physical_plan::{CsvSource, FileScanConfig, JsonSource};
3737
use datafusion::datasource::source::DataSourceExec;
3838
use datafusion::execution::runtime_env::RuntimeEnv;
3939
use datafusion::execution::FunctionRegistry;
@@ -247,6 +247,15 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
247247
.with_file_compression_type(FileCompressionType::UNCOMPRESSED);
248248
Ok(conf.build())
249249
}
250+
PhysicalPlanType::JsonScan(scan) => {
251+
let scan_conf = parse_protobuf_file_scan_config(
252+
scan.base_conf.as_ref().unwrap(),
253+
registry,
254+
extension_codec,
255+
Arc::new(JsonSource::new()),
256+
)?;
257+
Ok(scan_conf.build())
258+
}
250259
#[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
251260
PhysicalPlanType::ParquetScan(scan) => {
252261
#[cfg(feature = "parquet")]
@@ -1684,6 +1693,26 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
16841693
}
16851694
}
16861695

1696+
if let Some(data_source_exec) = plan.downcast_ref::<DataSourceExec>() {
1697+
let data_source = data_source_exec.data_source();
1698+
if let Some(scan_conf) = data_source.as_any().downcast_ref::<FileScanConfig>()
1699+
{
1700+
let source = scan_conf.file_source();
1701+
if let Some(_json_source) = source.as_any().downcast_ref::<JsonSource>() {
1702+
return Ok(protobuf::PhysicalPlanNode {
1703+
physical_plan_type: Some(PhysicalPlanType::JsonScan(
1704+
protobuf::JsonScanExecNode {
1705+
base_conf: Some(serialize_file_scan_config(
1706+
scan_conf,
1707+
extension_codec,
1708+
)?),
1709+
},
1710+
)),
1711+
});
1712+
}
1713+
}
1714+
}
1715+
16871716
#[cfg(feature = "parquet")]
16881717
if let Some(exec) = plan.downcast_ref::<DataSourceExec>() {
16891718
let data_source_exec = exec.data_source();

datafusion/proto/tests/cases/roundtrip_physical_plan.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1277,6 +1277,15 @@ fn roundtrip_analyze() -> Result<()> {
12771277
)))
12781278
}
12791279

1280+
#[tokio::test]
1281+
async fn roundtrip_json_source() -> Result<()> {
1282+
let ctx = SessionContext::new();
1283+
ctx.register_json("t1", "../core/tests/data/1.json", Default::default())
1284+
.await?;
1285+
let plan = ctx.table("t1").await?.create_physical_plan().await?;
1286+
roundtrip_test(plan)
1287+
}
1288+
12801289
#[test]
12811290
fn roundtrip_json_sink() -> Result<()> {
12821291
let field_a = Field::new("plan_type", DataType::Utf8, false);

0 commit comments

Comments
 (0)