Skip to content

Commit

Permalink
feat: add collect_partitioned funcation for dataframe
Browse files Browse the repository at this point in the history
  • Loading branch information
francis-du committed Sep 7, 2022
1 parent 8a2c905 commit 3e79ae1
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 1 deletion.
11 changes: 11 additions & 0 deletions datafusion/tests/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,3 +212,14 @@ def test_explain(df):
column("a") - column("b"),
)
df.explain()


def test_collect_partitioned():
ctx = SessionContext()

batch = pa.RecordBatch.from_arrays(
[pa.array([1, 2, 3]), pa.array([4, 5, 6])],
names=["a", "b"],
)

assert [[batch]] == ctx.create_dataframe([[batch]]).collect_partitioned()
13 changes: 12 additions & 1 deletion src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,17 @@ impl PyDataFrame {
batches.into_iter().map(|rb| rb.to_pyarrow(py)).collect()
}

/// Executes this DataFrame and collects all results into a vector of vector of RecordBatch
/// maintaining the input partitioning.
fn collect_partitioned(&self, py: Python) -> PyResult<Vec<Vec<PyObject>>> {
let batches = wait_for_future(py, self.df.collect_partitioned())?;

batches
.into_iter()
.map(|rbs| rbs.into_iter().map(|rb| rb.to_pyarrow(py)).collect())
.collect()
}

/// Print the result, 20 lines by default
#[args(num = "20")]
fn show(&self, py: Python, num: usize) -> PyResult<()> {
Expand All @@ -147,7 +158,7 @@ impl PyDataFrame {
"The join type {} does not exist or is not implemented",
how
))
.into())
.into());
}
};

Expand Down

0 comments on commit 3e79ae1

Please sign in to comment.