Skip to content

Commit bc677ca

Browse files
committed
add convert for schema to arrow schema
1 parent 6e29ca7 commit bc677ca

File tree

1 file changed

+231
-2
lines changed

1 file changed

+231
-2
lines changed

crates/iceberg/src/arrow.rs

Lines changed: 231 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,21 @@
1717

1818
//! Parquet file data reader
1919
20+
use std::collections::HashMap;
21+
2022
use async_stream::try_stream;
2123
use futures::stream::StreamExt;
22-
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
24+
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FIELD_ID_META_KEY};
25+
use rust_decimal::prelude::ToPrimitive;
2326

2427
use crate::io::FileIO;
2528
use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
26-
use crate::spec::SchemaRef;
29+
use crate::spec::{visit_schema, SchemaRef, SchemaVisitor};
30+
use crate::Error;
31+
use arrow_schema::Field as ArrowField;
32+
use arrow_schema::FieldRef as ArrowFieldRef;
33+
use arrow_schema::Schema as ArrowSchema;
34+
use arrow_schema::{DataType as ArrowType, TimeUnit};
2735

2836
/// Builder to create ArrowReader
2937
pub struct ArrowReaderBuilder {
@@ -106,3 +114,224 @@ impl ArrowReader {
106114
ProjectionMask::all()
107115
}
108116
}
117+
118+
/// The key of column id in the metadata of arrow field.
119+
pub const COLUMN_ID_META_KEY: &str = "column_id";
120+
/// The key of doc in the metadata of arrow field.
121+
pub const DOC: &str = "doc";
122+
123+
struct ToArrowSchemaConverter;
124+
125+
enum ArrowSchemaOrFieldOrType {
126+
Schema(ArrowSchema),
127+
Field(ArrowFieldRef),
128+
Type(ArrowType),
129+
}
130+
131+
impl SchemaVisitor for ToArrowSchemaConverter {
132+
type T = ArrowSchemaOrFieldOrType;
133+
134+
fn schema(&mut self, _schema: &crate::spec::Schema, value: Self::T) -> crate::Result<Self::T> {
135+
let struct_type = match value {
136+
ArrowSchemaOrFieldOrType::Type(ArrowType::Struct(fields)) => fields,
137+
_ => unreachable!(),
138+
};
139+
Ok(ArrowSchemaOrFieldOrType::Schema(ArrowSchema::new(
140+
struct_type,
141+
)))
142+
}
143+
144+
fn field(
145+
&mut self,
146+
field: &crate::spec::NestedFieldRef,
147+
value: Self::T,
148+
) -> crate::Result<Self::T> {
149+
let ty = match value {
150+
ArrowSchemaOrFieldOrType::Type(ty) => ty,
151+
_ => unreachable!(),
152+
};
153+
let mut metadata = HashMap::new();
154+
metadata.insert(COLUMN_ID_META_KEY.to_string(), field.id.to_string());
155+
metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), field.id.to_string());
156+
if let Some(doc) = &field.doc {
157+
metadata.insert(DOC.to_string(), doc.clone());
158+
}
159+
Ok(ArrowSchemaOrFieldOrType::Field(
160+
ArrowField::new(field.name.clone(), ty, !field.required)
161+
.with_metadata(metadata)
162+
.into(),
163+
))
164+
}
165+
166+
fn r#struct(
167+
&mut self,
168+
_: &crate::spec::StructType,
169+
results: Vec<Self::T>,
170+
) -> crate::Result<Self::T> {
171+
let fields = results
172+
.into_iter()
173+
.map(|result| match result {
174+
ArrowSchemaOrFieldOrType::Field(field) => field,
175+
_ => unreachable!(),
176+
})
177+
.collect();
178+
Ok(ArrowSchemaOrFieldOrType::Type(ArrowType::Struct(fields)))
179+
}
180+
181+
fn list(&mut self, list: &crate::spec::ListType, value: Self::T) -> crate::Result<Self::T> {
182+
let field = match self.field(&list.element_field, value)? {
183+
ArrowSchemaOrFieldOrType::Field(field) => field,
184+
_ => unreachable!(),
185+
};
186+
Ok(ArrowSchemaOrFieldOrType::Type(ArrowType::List(field)))
187+
}
188+
189+
fn map(
190+
&mut self,
191+
map: &crate::spec::MapType,
192+
key_value: Self::T,
193+
value: Self::T,
194+
) -> crate::Result<Self::T> {
195+
let key_field = match self.field(&map.key_field, key_value)? {
196+
ArrowSchemaOrFieldOrType::Field(field) => field,
197+
_ => unreachable!(),
198+
};
199+
let value_field = match self.field(&map.value_field, value)? {
200+
ArrowSchemaOrFieldOrType::Field(field) => field,
201+
_ => unreachable!(),
202+
};
203+
let field = ArrowField::new(
204+
"entries",
205+
ArrowType::Struct(vec![key_field, value_field].into()),
206+
map.value_field.required,
207+
);
208+
209+
Ok(ArrowSchemaOrFieldOrType::Type(ArrowType::Map(
210+
field.into(),
211+
false,
212+
)))
213+
}
214+
215+
fn primitive(&mut self, p: &crate::spec::PrimitiveType) -> crate::Result<Self::T> {
216+
match p {
217+
crate::spec::PrimitiveType::Boolean => {
218+
Ok(ArrowSchemaOrFieldOrType::Type(ArrowType::Boolean))
219+
}
220+
crate::spec::PrimitiveType::Int => Ok(ArrowSchemaOrFieldOrType::Type(ArrowType::Int32)),
221+
crate::spec::PrimitiveType::Long => {
222+
Ok(ArrowSchemaOrFieldOrType::Type(ArrowType::Int64))
223+
}
224+
crate::spec::PrimitiveType::Float => {
225+
Ok(ArrowSchemaOrFieldOrType::Type(ArrowType::Float32))
226+
}
227+
crate::spec::PrimitiveType::Double => {
228+
Ok(ArrowSchemaOrFieldOrType::Type(ArrowType::Float64))
229+
}
230+
crate::spec::PrimitiveType::Decimal { precision, scale } => {
231+
Ok(ArrowSchemaOrFieldOrType::Type(ArrowType::Decimal128(
232+
TryInto::try_into(*precision).map_err(|err| {
233+
Error::new(
234+
crate::ErrorKind::DataInvalid,
235+
"incompatible precision for decimal type convert",
236+
)
237+
.with_source(err)
238+
})?,
239+
TryInto::try_into(*scale).map_err(|err| {
240+
Error::new(
241+
crate::ErrorKind::DataInvalid,
242+
"incompatible scale for decimal type convert",
243+
)
244+
.with_source(err)
245+
})?,
246+
)))
247+
}
248+
crate::spec::PrimitiveType::Date => {
249+
Ok(ArrowSchemaOrFieldOrType::Type(ArrowType::Date32))
250+
}
251+
crate::spec::PrimitiveType::Time => Ok(ArrowSchemaOrFieldOrType::Type(
252+
ArrowType::Time32(TimeUnit::Microsecond),
253+
)),
254+
crate::spec::PrimitiveType::Timestamp => Ok(ArrowSchemaOrFieldOrType::Type(
255+
ArrowType::Timestamp(TimeUnit::Microsecond, None),
256+
)),
257+
crate::spec::PrimitiveType::Timestamptz => Ok(ArrowSchemaOrFieldOrType::Type(
258+
// Timestampz always stored as UTC
259+
ArrowType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
260+
)),
261+
crate::spec::PrimitiveType::String => {
262+
Ok(ArrowSchemaOrFieldOrType::Type(ArrowType::Utf8))
263+
}
264+
crate::spec::PrimitiveType::Uuid => Ok(ArrowSchemaOrFieldOrType::Type(
265+
ArrowType::FixedSizeBinary(16),
266+
)),
267+
crate::spec::PrimitiveType::Fixed(len) => Ok(ArrowSchemaOrFieldOrType::Type(
268+
len.to_i32()
269+
.map(ArrowType::FixedSizeBinary)
270+
.unwrap_or(ArrowType::LargeBinary),
271+
)),
272+
crate::spec::PrimitiveType::Binary => {
273+
Ok(ArrowSchemaOrFieldOrType::Type(ArrowType::LargeBinary))
274+
}
275+
}
276+
}
277+
}
278+
279+
pub(crate) fn schema_to_arrow_schema(schema: &crate::spec::Schema) -> crate::Result<ArrowSchema> {
280+
let mut converter = ToArrowSchemaConverter;
281+
match visit_schema(schema, &mut converter)? {
282+
ArrowSchemaOrFieldOrType::Schema(schema) => Ok(schema),
283+
_ => unreachable!(),
284+
}
285+
}
286+
287+
impl TryFrom<&crate::spec::Schema> for ArrowSchema {
288+
type Error = Error;
289+
290+
fn try_from(schema: &crate::spec::Schema) -> crate::Result<Self> {
291+
schema_to_arrow_schema(schema)
292+
}
293+
}
294+
295+
#[cfg(test)]
296+
mod tests {
297+
use crate::spec::{NestedField, PrimitiveType, Schema, Type};
298+
299+
use super::*;
300+
301+
#[test]
302+
fn test_try_into_arrow_schema() {
303+
let schema = Schema::builder()
304+
.with_fields(vec![
305+
NestedField {
306+
id: 0,
307+
name: "id".to_string(),
308+
required: true,
309+
field_type: Box::new(Type::Primitive(PrimitiveType::Long)),
310+
doc: None,
311+
initial_default: None,
312+
write_default: None,
313+
}
314+
.into(),
315+
NestedField {
316+
id: 1,
317+
name: "data".to_string(),
318+
required: false,
319+
field_type: Box::new(Type::Primitive(PrimitiveType::String)),
320+
doc: None,
321+
initial_default: None,
322+
write_default: None,
323+
}
324+
.into(),
325+
])
326+
.build()
327+
.unwrap();
328+
329+
let arrow_schema = ArrowSchema::try_from(&schema).unwrap();
330+
331+
assert_eq!(arrow_schema.fields().len(), 2);
332+
assert_eq!(arrow_schema.fields()[0].name(), "id");
333+
assert_eq!(arrow_schema.fields()[0].data_type(), &ArrowType::Int64);
334+
assert_eq!(arrow_schema.fields()[1].name(), "data");
335+
assert_eq!(arrow_schema.fields()[1].data_type(), &ArrowType::Utf8);
336+
}
337+
}

0 commit comments

Comments
 (0)