Skip to content

Commit

Permalink
fix proto deserialization issue
Browse files Browse the repository at this point in the history
  • Loading branch information
guoying06 committed Jan 9, 2025
1 parent 2e9851f commit 1e911d9
Showing 1 changed file with 51 additions and 43 deletions.
94 changes: 51 additions & 43 deletions native-engine/blaze-serde/src/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1168,53 +1168,61 @@ pub fn parse_protobuf_partitioning(
}

RepartitionType::RangeRepartition(range_part) => {
let sort = range_part.sort_expr.clone().unwrap();
let exprs = try_parse_physical_sort_expr(&input, &sort).unwrap();
if range_part.partition_count == 1 {
Ok(Some(Partitioning::SinglePartitioning()))
} else {
let sort = range_part.sort_expr.clone().unwrap();
let exprs = try_parse_physical_sort_expr(&input, &sort).unwrap();

let value_list = &range_part.list_value;

let value_list = &range_part.list_value;
let sort_row_converter = Arc::new(SyncMutex::new(RowConverter::new(
exprs
.iter()
.map(|expr: &PhysicalSortExpr| {
Ok(SortField::new_with_options(
expr.expr.data_type(&input.schema())?,
expr.options,
))
})
.collect::<Result<Vec<SortField>>>()?,
)?));

let sort_row_converter = Arc::new(SyncMutex::new(RowConverter::new(
exprs
let bound_cols: Vec<ArrayRef> = value_list
.iter()
.map(|expr: &PhysicalSortExpr| {
Ok(SortField::new_with_options(
expr.expr.data_type(&input.schema())?,
expr.options,
))
.map(|x| {
let xx = x.clone().value.unwrap();
let values_ref = match xx {
protobuf::scalar_value::Value::ListValue(scalar_list) => {
let protobuf::ScalarListValue {
values,
datatype: _opt_scalar_type,
} = scalar_list;
let value_vec: Vec<ScalarValue> = values
.iter()
.map(|val| val.try_into())
.collect::<Result<Vec<_>, _>>()
.map_err(|_| {
proto_error("partition::from_proto() error")
})?;
ScalarValue::iter_to_array(value_vec)
.map_err(|_| proto_error("partition::from_proto() error"))
}
_ => Err(proto_error(
"partition::from_proto() bound_list type error",
)),
};
values_ref
})
.collect::<Result<Vec<SortField>>>()?,
)?));

let bound_cols: Vec<ArrayRef> = value_list
.iter()
.map(|x| {
let xx = x.clone().value.unwrap();
let values_ref = match xx {
protobuf::scalar_value::Value::ListValue(scalar_list) => {
let protobuf::ScalarListValue {
values,
datatype: _opt_scalar_type,
} = scalar_list;
let value_vec: Vec<ScalarValue> = values
.iter()
.map(|val| val.try_into())
.collect::<Result<Vec<_>, _>>()
.map_err(|_| proto_error("partition::from_proto() error"))?;
ScalarValue::iter_to_array(value_vec)
.map_err(|_| proto_error("partition::from_proto() error"))
}
_ => Err(proto_error("partition::from_proto() bound_list type error")),
};
values_ref
})
.collect::<Result<Vec<ArrayRef>, _>>()?;

let bound_rows = sort_row_converter.lock().convert_columns(&bound_cols)?;
Ok(Some(Partitioning::RangePartitioning(
exprs,
range_part.partition_count.try_into().unwrap(),
Arc::new(bound_rows),
)))
.collect::<Result<Vec<ArrayRef>, _>>()?;

let bound_rows = sort_row_converter.lock().convert_columns(&bound_cols)?;
Ok(Some(Partitioning::RangePartitioning(
exprs,
range_part.partition_count.try_into().unwrap(),
Arc::new(bound_rows),
)))
}
}
}
})
Expand Down

0 comments on commit 1e911d9

Please sign in to comment.