Skip to content

Commit

Permalink
replace from with try_from
Browse files Browse the repository at this point in the history
  • Loading branch information
comphead committed Sep 10, 2022
1 parent 3987117 commit c1692a2
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 55 deletions.
2 changes: 1 addition & 1 deletion datafusion/proto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,7 @@ mod roundtrip_tests {
];

for test_case in test_cases.into_iter() {
let proto: super::protobuf::ArrowType = (&test_case).into();
let proto: super::protobuf::ArrowType = (&test_case).try_into().unwrap();
let roundtrip: DataType = (&proto).try_into().unwrap();
assert_eq!(format!("{:?}", test_case), format!("{:?}", roundtrip));
}
Expand Down
8 changes: 4 additions & 4 deletions datafusion/proto/src/logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,7 @@ impl AsLogicalPlan for LogicalPlanNode {
})
}
};
let schema: protobuf::Schema = schema.as_ref().into();
let schema: protobuf::Schema = schema.as_ref().try_into()?;

let filters: Vec<protobuf::LogicalExprNode> = filters
.iter()
Expand Down Expand Up @@ -1048,7 +1048,7 @@ impl AsLogicalPlan for LogicalPlanNode {
location: location.clone(),
file_type: file_type.clone(),
has_header: *has_header,
schema: Some(df_schema.into()),
schema: Some(df_schema.try_into()?),
table_partition_cols: table_partition_cols.clone(),
if_not_exists: *if_not_exists,
delimiter: String::from(*delimiter),
Expand Down Expand Up @@ -1083,7 +1083,7 @@ impl AsLogicalPlan for LogicalPlanNode {
protobuf::CreateCatalogSchemaNode {
schema_name: schema_name.clone(),
if_not_exists: *if_not_exists,
schema: Some(df_schema.into()),
schema: Some(df_schema.try_into()?),
},
)),
}),
Expand All @@ -1096,7 +1096,7 @@ impl AsLogicalPlan for LogicalPlanNode {
protobuf::CreateCatalogNode {
catalog_name: catalog_name.clone(),
if_not_exists: *if_not_exists,
schema: Some(df_schema.into()),
schema: Some(df_schema.try_into()?),
},
)),
}),
Expand Down
124 changes: 74 additions & 50 deletions datafusion/proto/src/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
use crate::protobuf::{
self,
arrow_type::ArrowTypeEnum,
plan_type::PlanTypeEnum::{
FinalLogicalPlan, FinalPhysicalPlan, InitialLogicalPlan, InitialPhysicalPlan,
OptimizedLogicalPlan, OptimizedPhysicalPlan,
Expand Down Expand Up @@ -124,27 +125,36 @@ impl Error {
}
}

impl From<&Field> for protobuf::Field {
fn from(field: &Field) -> Self {
Self {
impl TryFrom<&Field> for protobuf::Field {
type Error = Error;

fn try_from(field: &Field) -> Result<Self, Self::Error> {
let arrow_type = field.data_type().try_into()?;
Ok(Self {
name: field.name().to_owned(),
arrow_type: Some(Box::new(field.data_type().into())),
arrow_type: Some(Box::new(arrow_type)),
nullable: field.is_nullable(),
children: Vec::new(),
}
})
}
}

impl From<&DataType> for protobuf::ArrowType {
fn from(val: &DataType) -> Self {
Self {
arrow_type_enum: Some(val.into()),
}
impl TryFrom<&DataType> for protobuf::ArrowType {
type Error = Error;

fn try_from(val: &DataType) -> Result<Self, Self::Error> {
let arrow_type_enum: ArrowTypeEnum = val.try_into()?;
Ok(Self {
arrow_type_enum: Some(arrow_type_enum),
})
}
}

impl From<&DataType> for protobuf::arrow_type::ArrowTypeEnum {
fn from(val: &DataType) -> Self {
impl TryFrom<&DataType> for protobuf::arrow_type::ArrowTypeEnum {
type Error = Error;

fn try_from(val: &DataType) -> Result<Self, Self::Error> {
let res =
match val {
DataType::Null => Self::None(EmptyMessage {}),
DataType::Boolean => Self::Bool(EmptyMessage {}),
Expand Down Expand Up @@ -185,53 +195,55 @@ impl From<&DataType> for protobuf::arrow_type::ArrowTypeEnum {
DataType::Utf8 => Self::Utf8(EmptyMessage {}),
DataType::LargeUtf8 => Self::LargeUtf8(EmptyMessage {}),
DataType::List(item_type) => Self::List(Box::new(protobuf::List {
field_type: Some(Box::new(item_type.as_ref().into())),
field_type: Some(Box::new(item_type.as_ref().try_into()?)),
})),
DataType::FixedSizeList(item_type, size) => {
Self::FixedSizeList(Box::new(protobuf::FixedSizeList {
field_type: Some(Box::new(item_type.as_ref().into())),
field_type: Some(Box::new(item_type.as_ref().try_into()?)),
list_size: *size,
}))
}
DataType::LargeList(item_type) => Self::LargeList(Box::new(protobuf::List {
field_type: Some(Box::new(item_type.as_ref().into())),
field_type: Some(Box::new(item_type.as_ref().try_into()?)),
})),
DataType::Struct(struct_fields) => Self::Struct(protobuf::Struct {
sub_field_types: struct_fields
.iter()
.map(|field| field.into())
.collect::<Vec<_>>(),
.map(|field| field.try_into())
.collect::<Result<Vec<_>, Error>>()?,
}),
DataType::Union(union_types, type_ids, union_mode) => {
let union_mode = match union_mode {
UnionMode::Sparse => protobuf::UnionMode::Sparse,
UnionMode::Dense => protobuf::UnionMode::Dense,
};
Self::Union(protobuf::Union {
union_types: union_types.iter().map(Into::into).collect(),
union_types: union_types.iter().map(|field| field.try_into()).collect::<Result<Vec<_>, Error>>()?,
union_mode: union_mode.into(),
type_ids: type_ids.iter().map(|x| *x as i32).collect(),
})
}
DataType::Dictionary(key_type, value_type) => {
Self::Dictionary(Box::new(protobuf::Dictionary {
key: Some(Box::new(key_type.as_ref().into())),
value: Some(Box::new(value_type.as_ref().into())),
key: Some(Box::new(key_type.as_ref().try_into()?)),
value: Some(Box::new(value_type.as_ref().try_into()?)),
}))
}
DataType::Decimal128(whole, fractional) => Self::Decimal(protobuf::Decimal {
whole: *whole as u64,
fractional: *fractional as u64,
}),
DataType::Decimal256(_, _) => {
unimplemented!("Proto serialization error: The Decimal256 data type is not yet supported")
return Err(Error::General("Proto serialization error: The Decimal256 data type is not yet supported".to_owned()))
}
DataType::Map(_, _) => {
unimplemented!(
"Proto serialization error: The Map data type is not yet supported"
)
return Err(Error::General(
"Proto serialization error: The Map data type is not yet supported".to_owned()
))
}
}
};

Ok(res)
}
}

Expand All @@ -252,48 +264,60 @@ impl From<&Column> for protobuf::Column {
}
}

impl From<&Schema> for protobuf::Schema {
fn from(schema: &Schema) -> Self {
Self {
impl TryFrom<&Schema> for protobuf::Schema {
type Error = Error;

fn try_from(schema: &Schema) -> Result<Self, Self::Error> {
Ok(Self {
columns: schema
.fields()
.iter()
.map(protobuf::Field::from)
.collect::<Vec<_>>(),
}
.map(protobuf::Field::try_from)
.collect::<Result<Vec<_>, Error>>()?,
})
}
}

impl From<SchemaRef> for protobuf::Schema {
fn from(schema: SchemaRef) -> Self {
Self {
impl TryFrom<SchemaRef> for protobuf::Schema {
type Error = Error;

fn try_from(schema: SchemaRef) -> Result<Self, Self::Error> {
Ok(Self {
columns: schema
.fields()
.iter()
.map(protobuf::Field::from)
.collect::<Vec<_>>(),
}
.map(protobuf::Field::try_from)
.collect::<Result<Vec<_>, Error>>()?,
})
}
}

impl From<&DFField> for protobuf::DfField {
fn from(f: &DFField) -> protobuf::DfField {
protobuf::DfField {
field: Some(f.field().into()),
impl TryFrom<&DFField> for protobuf::DfField {
type Error = Error;

fn try_from(f: &DFField) -> Result<Self, Self::Error> {
Ok(Self {
field: Some(f.field().try_into()?),
qualifier: f.qualifier().map(|r| protobuf::ColumnRelation {
relation: r.to_string(),
}),
}
})
}
}

impl From<&DFSchemaRef> for protobuf::DfSchema {
fn from(s: &DFSchemaRef) -> protobuf::DfSchema {
let columns = s.fields().iter().map(|f| f.into()).collect::<Vec<_>>();
protobuf::DfSchema {
impl TryFrom<&DFSchemaRef> for protobuf::DfSchema {
type Error = Error;

fn try_from(s: &DFSchemaRef) -> Result<Self, Self::Error> {
let columns = s
.fields()
.iter()
.map(|f| f.try_into())
.collect::<Result<Vec<_>, Error>>()?;
Ok(Self {
columns,
metadata: s.metadata().clone(),
}
})
}
}

Expand Down Expand Up @@ -771,7 +795,7 @@ impl TryFrom<&Expr> for protobuf::LogicalExprNode {
Expr::Cast { expr, data_type } => {
let expr = Box::new(protobuf::CastNode {
expr: Some(Box::new(expr.as_ref().try_into()?)),
arrow_type: Some(data_type.into()),
arrow_type: Some(data_type.try_into()?),
});
Self {
expr_type: Some(ExprType::Cast(expr)),
Expand Down Expand Up @@ -954,7 +978,7 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
protobuf::ScalarValue {
value: Some(protobuf::scalar_value::Value::ListValue(
protobuf::ScalarListValue {
field: Some(boxed_field.as_ref().into()),
field: Some(boxed_field.as_ref().try_into()?),
values: Vec::new(),
},
)),
Expand Down Expand Up @@ -1044,7 +1068,7 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
protobuf::ScalarValue {
value: Some(protobuf::scalar_value::Value::ListValue(
protobuf::ScalarListValue {
field: Some(boxed_field.as_ref().into()),
field: Some(boxed_field.as_ref().try_into()?),
values: type_checked_values,
},
)),
Expand Down

0 comments on commit c1692a2

Please sign in to comment.