Skip to content

Commit

Permalink
feat: issue apache#9157 adding record_batches for Vec<BatchRecord>
Browse files Browse the repository at this point in the history
  • Loading branch information
Lordworms committed Feb 12, 2024
1 parent afb169c commit d3e162b
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 1 deletion.
25 changes: 24 additions & 1 deletion datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -934,7 +934,28 @@ impl SessionContext {
.build()?,
))
}

/// Create a [`DataFrame`] for reading a [`Vec[`RecordBatch`]`]
pub fn read_batches(
&self,
batches: impl IntoIterator<Item = RecordBatch>,
) -> Result<DataFrame> {
// check schema uniqueness
let mut batches = batches.into_iter().peekable();
let schema: SchemaRef = batches.peek().unwrap().schema().clone();
let provider = MemTable::try_new(
schema,
batches.into_iter().map(|batch| vec![batch]).collect(),
)?;
Ok(DataFrame::new(
self.state(),
LogicalPlanBuilder::scan(
UNNAMED_TABLE,
provider_as_source(Arc::new(provider)),
None,
)?
.build()?,
))
}
/// Registers a [`ListingTable`] that can assemble multiple files
/// from locations in an [`ObjectStore`] instance into a single
/// table.
Expand Down Expand Up @@ -2148,6 +2169,8 @@ mod tests {
use crate::test;
use crate::test_util::{plan_and_collect, populate_csv_partitions};
use crate::variable::VarType;
use arrow_array::Int32Array;
use arrow_schema::{Field, Schema};
use async_trait::async_trait;
use datafusion_expr::Expr;
use std::env;
Expand Down
84 changes: 84 additions & 0 deletions datafusion/core/tests/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use arrow::{
},
record_batch::RecordBatch,
};
use arrow_array::Float32Array;
use arrow_schema::ArrowError;
use std::sync::Arc;

Expand Down Expand Up @@ -1431,6 +1432,89 @@ async fn unnest_analyze_metrics() -> Result<()> {

Ok(())
}
#[tokio::test]
async fn test_read_batches() -> Result<()> {
let config = SessionConfig::new();
let runtime = Arc::new(RuntimeEnv::default());
let state = SessionState::new_with_config_rt(config, runtime);
let ctx = SessionContext::new_with_state(state);

let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("number", DataType::Float32, false),
]));

let batches = vec![
RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])),
Arc::new(Float32Array::from(vec![1.12, 3.40, 2.33, 9.10, 6.66])),
],
)
.unwrap(),
RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![3, 4, 5])),
Arc::new(Float32Array::from(vec![1.11, 2.22, 3.33])),
],
)
.unwrap(),
RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![0, 1, 2])),
Arc::new(Float32Array::from(vec![4.44, 5.02, 6.03])),
],
)
.unwrap(),
RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![0, 1, 3])),
Arc::new(Float32Array::from(vec![6.01, 2.02, 3.03])),
],
)
.unwrap(),
RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![3, 1, 2])),
Arc::new(Float32Array::from(vec![1000.01, 2.02, 3.03])),
],
)
.unwrap(),
];
let df = ctx.read_batches(batches).unwrap();
df.clone().show().await.unwrap();
let result = df.collect().await?;
let expected = vec![
"+----+---------+",
"| id | number |",
"+----+---------+",
"| 1 | 1.12 |",
"| 2 | 3.4 |",
"| 3 | 2.33 |",
"| 4 | 9.1 |",
"| 5 | 6.66 |",
"| 3 | 1.11 |",
"| 4 | 2.22 |",
"| 5 | 3.33 |",
"| 0 | 4.44 |",
"| 1 | 5.02 |",
"| 2 | 6.03 |",
"| 0 | 6.01 |",
"| 1 | 2.02 |",
"| 3 | 3.03 |",
"| 3 | 1000.01 |",
"| 1 | 2.02 |",
"| 2 | 3.03 |",
"+----+---------+",
];
assert_batches_sorted_eq!(expected, &result);
Ok(())
}

#[tokio::test]
async fn consecutive_projection_same_schema() -> Result<()> {
Expand Down

0 comments on commit d3e162b

Please sign in to comment.