@@ -2,9 +2,8 @@ use std::sync::Arc;
22
33use async_recursion:: async_recursion;
44
5- use datafusion:: prelude:: { CsvReadOptions , DataFrame , ExecutionContext } ;
5+ use datafusion:: prelude:: { DataFrame , ExecutionContext } ;
66use datafusion:: {
7- arrow:: datatypes:: { DataType , Field , Schema } ,
87 error:: { DataFusionError , Result } ,
98 logical_plan:: { DFSchemaRef , Expr , LogicalPlan } ,
109 prelude:: Column ,
@@ -67,7 +66,7 @@ pub fn to_substrait_rel(plan: &LogicalPlan) -> Result<Box<Rel>> {
6766 . iter ( )
6867 . map ( |f| f. name ( ) . to_owned ( ) )
6968 . collect ( ) ,
70- r#struct : None , // TODO
69+ r#struct : None ,
7170 } ) ,
7271 filter : None ,
7372 projection : Some ( MaskExpression {
@@ -105,10 +104,6 @@ pub fn to_substrait_rel(plan: &LogicalPlan) -> Result<Box<Rel>> {
105104 }
106105}
107106
108- /// Convert Substrait Rex to DataFusion Expr
109- // pub fn from_substrait_rex(rex: &Expression) -> Result<Expr> {
110- // }
111-
112107/// Convert Substrait Rel to DataFusion DataFrame
113108#[ async_recursion]
114109pub async fn from_substrait_rel (
@@ -121,60 +116,44 @@ pub async fn from_substrait_rel(
121116 let exprs: Vec < Expr > = p
122117 . expressions
123118 . iter ( )
124- . map ( |e| {
125- match & e. rex_type {
126- Some ( RexType :: Selection ( field_ref) ) => {
127- match & field_ref. reference_type {
128- Some ( MaskedReference ( mask) ) => {
129- //TODO remove unwrap
130- let xx = & mask. select . as_ref ( ) . unwrap ( ) . struct_items ;
131- assert ! ( xx. len( ) == 1 ) ;
132- Ok ( Expr :: Column ( Column {
133- relation : None ,
134- name : input
135- . schema ( )
136- . field ( xx[ 0 ] . field as usize )
137- . name ( )
138- . to_string ( ) ,
139- } ) )
140- }
141- _ => Err ( DataFusionError :: NotImplemented (
142- "unsupported field ref type" . to_string ( ) ,
143- ) ) ,
144- }
145- }
119+ . map ( |e| match & e. rex_type {
120+ Some ( RexType :: Selection ( field_ref) ) => match & field_ref. reference_type {
121+ Some ( MaskedReference ( mask) ) => match & mask. select . as_ref ( ) {
122+ Some ( x) if x. struct_items . len ( ) == 1 => Ok ( Expr :: Column ( Column {
123+ relation : None ,
124+ name : input
125+ . schema ( )
126+ . field ( x. struct_items [ 0 ] . field as usize )
127+ . name ( )
128+ . to_string ( ) ,
129+ } ) ) ,
130+ _ => Err ( DataFusionError :: NotImplemented (
131+ "invalid field reference" . to_string ( ) ,
132+ ) ) ,
133+ } ,
146134 _ => Err ( DataFusionError :: NotImplemented (
147- "unsupported rex_type in projection " . to_string ( ) ,
135+ "unsupported field ref type " . to_string ( ) ,
148136 ) ) ,
149- }
137+ } ,
138+ _ => Err ( DataFusionError :: NotImplemented (
139+ "unsupported rex_type in projection" . to_string ( ) ,
140+ ) ) ,
150141 } )
151142 . collect :: < Result < Vec < _ > > > ( ) ?;
152143
153144 input. select ( exprs)
154145 }
155- Some ( RelType :: Read ( read) ) => {
156- let schema = match & read. base_schema {
157- Some ( named_struct) => Schema :: new (
158- named_struct
159- . names
160- . iter ( )
161- . map ( |n| Field :: new ( n, DataType :: Utf8 , false ) )
162- . collect ( ) ,
163- ) ,
164- _ => unimplemented ! ( ) ,
165- } ;
166-
167- let table_name = match & read. as_ref ( ) . read_type {
168- Some ( ReadType :: NamedTable ( nt) ) => nt. names [ 0 ] . to_owned ( ) ,
169- _ => unimplemented ! ( ) ,
170- } ;
171-
172- //TODO assumes csv for now
173- let options = CsvReadOptions :: new ( ) . has_header ( true ) . schema ( & schema) ;
174- ctx. read_csv ( table_name, options) . await
175- }
146+ Some ( RelType :: Read ( read) ) => match & read. as_ref ( ) . read_type {
147+ Some ( ReadType :: NamedTable ( nt) ) => {
148+ let table_name: String = nt. names [ 0 ] . clone ( ) ;
149+ ctx. table ( & * table_name)
150+ }
151+ _ => Err ( DataFusionError :: NotImplemented (
152+ "Only NamedTable reads are supported" . to_string ( ) ,
153+ ) ) ,
154+ } ,
176155 _ => Err ( DataFusionError :: NotImplemented ( format ! (
177- "{:?}" ,
156+ "Unsupported RelType: {:?}" ,
178157 rel. rel_type
179158 ) ) ) ,
180159 }
0 commit comments