Skip to content

Commit

Permalink
fix: conflicting
Browse files Browse the repository at this point in the history
  • Loading branch information
francis-du committed Sep 9, 2022
1 parent eb01db1 commit aa15f32
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 0 deletions.
11 changes: 11 additions & 0 deletions datafusion/tests/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,3 +312,14 @@ def test_except_all():
df_a_e_b = df_a.except_all(df_b).sort(column("a").sort(ascending=True))

assert df_c.collect() == df_a_e_b.collect()


def test_collect_partitioned():
ctx = SessionContext()

batch = pa.RecordBatch.from_arrays(
[pa.array([1, 2, 3]), pa.array([4, 5, 6])],
names=["a", "b"],
)

assert [[batch]] == ctx.create_dataframe([[batch]]).collect_partitioned()
11 changes: 11 additions & 0 deletions src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,17 @@ impl PyDataFrame {
batches.into_iter().map(|rb| rb.to_pyarrow(py)).collect()
}

/// Executes this DataFrame and collects all results into a vector of vector of RecordBatch
/// maintaining the input partitioning.
fn collect_partitioned(&self, py: Python) -> PyResult<Vec<Vec<PyObject>>> {
let batches = wait_for_future(py, self.df.collect_partitioned())?;

batches
.into_iter()
.map(|rbs| rbs.into_iter().map(|rb| rb.to_pyarrow(py)).collect())
.collect()
}

/// Print the result, 20 lines by default
#[args(num = "20")]
fn show(&self, py: Python, num: usize) -> PyResult<()> {
Expand Down

0 comments on commit aa15f32

Please sign in to comment.