Skip to content

Commit

Permalink
refactor and add simple function to deserialize and serialize proto b… (
Browse files Browse the repository at this point in the history
#4892)

* refactor and add simple function to deserialize and serialize proto bytes without reading/writing to file

* Resolve syntax issues causing compilation errors and modified serialize_bytes to return a Result

* [review] Introduce DataFusionError on failure to encode and decode bytes

* Change error type from DataFusionError::Internal -> DataFusionError::Substrait

* cargo fmt fixes
  • Loading branch information
jdye64 authored Jan 19, 2023
1 parent 5025aa5 commit 6dce728
Showing 1 changed file with 18 additions and 6 deletions.
24 changes: 18 additions & 6 deletions datafusion/substrait/src/serializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use crate::producer;

use datafusion::common::DataFusionError;
use datafusion::error::Result;
use datafusion::prelude::*;

Expand All @@ -27,15 +28,22 @@ use std::fs::OpenOptions;
use std::io::{Read, Write};

pub async fn serialize(sql: &str, ctx: &SessionContext, path: &str) -> Result<()> {
let protobuf_out = serialize_bytes(sql, ctx).await;
let mut file = OpenOptions::new().create(true).write(true).open(path)?;
file.write_all(&protobuf_out?)?;
Ok(())
}

pub async fn serialize_bytes(sql: &str, ctx: &SessionContext) -> Result<Vec<u8>> {
let df = ctx.sql(sql).await?;
let plan = df.into_optimized_plan()?;
let proto = producer::to_substrait_plan(&plan)?;

let mut protobuf_out = Vec::<u8>::new();
proto.encode(&mut protobuf_out).unwrap();
let mut file = OpenOptions::new().create(true).write(true).open(path)?;
file.write_all(&protobuf_out)?;
Ok(())
proto.encode(&mut protobuf_out).map_err(|e| {
DataFusionError::Substrait(format!("Failed to encode substrait plan: {}", e))
})?;
Ok(protobuf_out)
}

pub async fn deserialize(path: &str) -> Result<Box<Plan>> {
Expand All @@ -44,7 +52,11 @@ pub async fn deserialize(path: &str) -> Result<Box<Plan>> {
let mut file = OpenOptions::new().read(true).open(path)?;

file.read_to_end(&mut protobuf_in)?;
let proto = Message::decode(&*protobuf_in).unwrap();
deserialize_bytes(protobuf_in).await
}

Ok(Box::new(proto))
pub async fn deserialize_bytes(proto_bytes: Vec<u8>) -> Result<Box<Plan>> {
Ok(Box::new(Message::decode(&*proto_bytes).map_err(|e| {
DataFusionError::Substrait(format!("Failed to decode substrait plan: {}", e))
})?))
}

0 comments on commit 6dce728

Please sign in to comment.