Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support with_projection and with_columns in ipc and parquet reader in eager mode #1751

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 70 additions & 1 deletion polars/polars-io/src/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ pub struct IpcReader<R> {
/// Aggregates chunks afterwards to a single chunk.
rechunk: bool,
stop_after_n_rows: Option<usize>,
projection: Option<Vec<usize>>,
columns: Option<Vec<String>>,
}

impl<R: Read + Seek> IpcReader<R> {
Expand All @@ -82,6 +84,20 @@ impl<R: Read + Seek> IpcReader<R> {
self.stop_after_n_rows = num_rows;
self
}

/// Columns to select/ project
pub fn with_columns(mut self, columns: Option<Vec<String>>) -> Self {
self.columns = columns;
self
}

/// Set the reader's column projection. This counts from 0, meaning that
/// `vec![0, 4]` would select the 1st and 5th column.
pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
self.projection = projection;
self
}

#[cfg(feature = "lazy")]
// todo! hoist to lazy crate
pub fn finish_with_scan_ops(
Expand Down Expand Up @@ -127,8 +143,11 @@ where
reader,
rechunk: true,
stop_after_n_rows: None,
columns: None,
projection: None,
}
}

fn set_rechunk(mut self, rechunk: bool) -> Self {
self.rechunk = rechunk;
self
Expand All @@ -137,7 +156,21 @@ where
fn finish(mut self) -> Result<DataFrame> {
let rechunk = self.rechunk;
let metadata = read::read_file_metadata(&mut self.reader)?;
let ipc_reader = read::FileReader::new(&mut self.reader, metadata, None);
let schema = metadata.schema();

if let Some(cols) = self.columns {
let mut prj = Vec::with_capacity(cols.len());
for column in cols.iter() {
let i = schema.index_of(column)?;
prj.push(i)
}

// Ipc reader panics if the projection is not in increasing order, so sorting is the safer way.
prj.sort_unstable();
self.projection = Some(prj)
}

let ipc_reader = read::FileReader::new(&mut self.reader, metadata, self.projection);
finish_reader(ipc_reader, rechunk, self.stop_after_n_rows, None, None)
}
}
Expand Down Expand Up @@ -192,6 +225,8 @@ where
#[cfg(test)]
mod test {
use crate::prelude::*;
use polars_core::df;
use polars_core::prelude::*;
use std::io::Cursor;

#[test]
Expand All @@ -208,4 +243,38 @@ mod test {
let df_read = IpcReader::new(buf).finish().unwrap();
assert!(df.frame_equal(&df_read));
}

#[test]
fn test_read_ipc_with_projection() {
let mut buf: Cursor<Vec<u8>> = Cursor::new(Vec::new());
let df = df!("a" => [1, 2, 3], "b" => [2, 3, 4], "c" => [3, 4, 5]).unwrap();

IpcWriter::new(&mut buf).finish(&df).expect("ipc writer");
buf.set_position(0);

let expected = df!("b" => [2, 3, 4], "c" => [3, 4, 5]).unwrap();
let df_read = IpcReader::new(buf)
.with_projection(Some(vec![1, 2]))
.finish()
.unwrap();
assert_eq!(df_read.shape(), (3, 2));
df_read.frame_equal(&expected);
}

#[test]
fn test_read_ipc_with_columns() {
let mut buf: Cursor<Vec<u8>> = Cursor::new(Vec::new());
let df = df!("a" => [1, 2, 3], "b" => [2, 3, 4], "c" => [3, 4, 5]).unwrap();

IpcWriter::new(&mut buf).finish(&df).expect("ipc writer");
buf.set_position(0);

let expected = df!("b" => [2, 3, 4], "c" => [3, 4, 5]).unwrap();
let df_read = IpcReader::new(buf)
.with_columns(Some(vec!["c".to_string(), "b".to_string()]))
.finish()
.unwrap();
assert_eq!(df_read.shape(), (3, 2));
df_read.frame_equal(&expected);
}
}
70 changes: 69 additions & 1 deletion polars/polars-io/src/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ pub struct ParquetReader<R: Read + Seek> {
reader: R,
rechunk: bool,
stop_after_n_rows: Option<usize>,
columns: Option<Vec<String>>,
projection: Option<Vec<usize>>,
}

impl<R> ParquetReader<R>
Expand Down Expand Up @@ -75,6 +77,19 @@ where
self
}

/// Columns to select/ project
pub fn with_columns(mut self, columns: Option<Vec<String>>) -> Self {
self.columns = columns;
self
}

/// Set the reader's column projection. This counts from 0, meaning that
/// `vec![0, 4]` would select the 1st and 5th column.
pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
self.projection = projection;
self
}

pub fn schema(mut self) -> Result<Schema> {
let metadata = read::read_metadata(&mut self.reader)?;

Expand Down Expand Up @@ -102,6 +117,8 @@ where
reader,
rechunk: false,
stop_after_n_rows: None,
columns: None,
projection: None,
}
}

Expand All @@ -112,10 +129,22 @@ where

fn finish(mut self) -> Result<DataFrame> {
let rechunk = self.rechunk;
let metadata = read::read_metadata(&mut self.reader)?;
let schema = read::schema::get_schema(&metadata)?;

if let Some(cols) = self.columns {
let mut prj = Vec::with_capacity(cols.len());
for col in cols.iter() {
let i = schema.index_of(col)?;
prj.push(i);
}

self.projection = Some(prj);
}

let reader = read::RecordReader::try_new(
&mut self.reader,
None,
self.projection,
self.stop_after_n_rows,
None,
None,
Expand Down Expand Up @@ -256,6 +285,7 @@ mod test {
use crate::prelude::*;
use polars_core::{df, prelude::*};
use std::fs::File;
use std::io::Cursor;

#[test]
fn test_parquet() {
Expand Down Expand Up @@ -289,4 +319,42 @@ mod test {
assert!(read.frame_equal_missing(&df));
Ok(())
}

#[test]
fn test_read_parquet_with_projection() {
let mut buf: Cursor<Vec<u8>> = Cursor::new(Vec::new());
let df = df!("a" => [1, 2, 3], "b" => [2, 3, 4], "c" => [3, 4, 5]).unwrap();

ParquetWriter::new(&mut buf)
.finish(&df)
.expect("parquet writer");
buf.set_position(0);

let expected = df!("b" => [2, 3, 4], "c" => [3, 4, 5]).unwrap();
let df_read = ParquetReader::new(buf)
.with_projection(Some(vec![1, 2]))
.finish()
.unwrap();
assert_eq!(df_read.shape(), (3, 2));
df_read.frame_equal(&expected);
}

#[test]
fn test_read_parquet_with_columns() {
let mut buf: Cursor<Vec<u8>> = Cursor::new(Vec::new());
let df = df!("a" => [1, 2, 3], "b" => [2, 3, 4], "c" => [3, 4, 5]).unwrap();

ParquetWriter::new(&mut buf)
.finish(&df)
.expect("parquet writer");
buf.set_position(0);

let expected = df!("b" => [2, 3, 4], "c" => [3, 4, 5]).unwrap();
let df_read = ParquetReader::new(buf)
.with_columns(Some(vec!["c".to_string(), "b".to_string()]))
.finish()
.unwrap();
assert_eq!(df_read.shape(), (3, 2));
df_read.frame_equal(&expected);
}
}
15 changes: 12 additions & 3 deletions py-polars/polars/eager/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,8 @@ def read_csv(
@staticmethod
def read_parquet(
file: Union[str, BinaryIO],
columns: Optional[tp.List[str]] = None,
projection: Optional[tp.List[int]] = None,
stop_after_n_rows: Optional[int] = None,
) -> "DataFrame":
"""
Expand All @@ -499,11 +501,18 @@ def read_parquet(
Only read specified number of rows of the dataset. After `n` stops reading.
"""
self = DataFrame.__new__(DataFrame)
self._df = PyDataFrame.read_parquet(file, stop_after_n_rows)
self._df = PyDataFrame.read_parquet(
file, columns, projection, stop_after_n_rows
)
return self

@staticmethod
def read_ipc(file: Union[str, BinaryIO]) -> "DataFrame":
def read_ipc(
file: Union[str, BinaryIO],
columns: Optional[tp.List[str]] = None,
projection: Optional[tp.List[int]] = None,
stop_after_n_rows: Optional[int] = None,
) -> "DataFrame":
"""
Read into a DataFrame from Arrow IPC stream format. This is also called the feather format.

Expand All @@ -517,7 +526,7 @@ def read_ipc(file: Union[str, BinaryIO]) -> "DataFrame":
DataFrame
"""
self = DataFrame.__new__(DataFrame)
self._df = PyDataFrame.read_ipc(file)
self._df = PyDataFrame.read_ipc(file, columns, projection, stop_after_n_rows)
return self

@staticmethod
Expand Down
8 changes: 3 additions & 5 deletions py-polars/polars/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,7 @@ def read_ipc(
)
tbl = pa.feather.read_table(data, memory_map=memory_map, columns=columns)
return pl.DataFrame._from_arrow(tbl)
return pl.DataFrame.read_ipc(data)
return pl.DataFrame.read_ipc(data, columns=columns)


def read_parquet(
Expand Down Expand Up @@ -626,9 +626,7 @@ def read_parquet(
raise ValueError(
"'stop_after_n_rows' cannot be used with 'use_pyarrow=True'."
)
else:
if columns:
raise ValueError("'columns' cannot be used with 'use_pyarrow=False'.")

storage_options = storage_options or {}
with _prepare_file_arg(source, **storage_options) as source_prep:
if use_pyarrow:
Expand All @@ -642,7 +640,7 @@ def read_parquet(
)
)
return pl.DataFrame.read_parquet(
source_prep, stop_after_n_rows=stop_after_n_rows
source_prep, stop_after_n_rows=stop_after_n_rows, columns=columns
)


Expand Down
25 changes: 22 additions & 3 deletions py-polars/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,17 +174,26 @@ impl PyDataFrame {

#[staticmethod]
#[cfg(feature = "parquet")]
pub fn read_parquet(py_f: PyObject, stop_after_n_rows: Option<usize>) -> PyResult<Self> {
pub fn read_parquet(
py_f: PyObject,
columns: Option<Vec<String>>,
projection: Option<Vec<usize>>,
stop_after_n_rows: Option<usize>,
) -> PyResult<Self> {
use EitherRustPythonFile::*;

let result = match get_either_file(py_f, false)? {
Py(f) => {
let buf = f.as_buffer();
ParquetReader::new(buf)
.with_columns(columns)
.with_stop_after_n_rows(stop_after_n_rows)
.with_stop_after_n_rows(stop_after_n_rows)
.finish()
}
Rust(f) => ParquetReader::new(f)
.with_projection(projection)
.with_columns(columns)
.with_stop_after_n_rows(stop_after_n_rows)
.finish(),
};
Expand All @@ -194,9 +203,19 @@ impl PyDataFrame {

#[staticmethod]
#[cfg(feature = "ipc")]
pub fn read_ipc(py_f: PyObject) -> PyResult<Self> {
pub fn read_ipc(
py_f: PyObject,
columns: Option<Vec<String>>,
projection: Option<Vec<usize>>,
stop_after_n_rows: Option<usize>,
) -> PyResult<Self> {
let file = get_file_like(py_f, false)?;
let df = IpcReader::new(file).finish().map_err(PyPolarsEr::from)?;
let df = IpcReader::new(file)
.with_projection(projection)
.with_columns(columns)
.with_stop_after_n_rows(stop_after_n_rows)
.finish()
.map_err(PyPolarsEr::from)?;
Ok(PyDataFrame::new(df))
}

Expand Down
13 changes: 13 additions & 0 deletions py-polars/tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,19 @@ def test_to_from_buffer(df):
assert df.frame_equal(df_1, null_equal=True)


def test_select_columns_from_buffer():
df = pl.DataFrame({"a": [1, 2, 3], "b": [True, False, True], "c": ["a", "b", "c"]})
expected = pl.DataFrame({"b": [True, False, True], "c": ["a", "b", "c"]})
for to_fn, from_fn in zip(
[df.to_parquet, df.to_ipc], [pl.read_parquet, pl.read_ipc]
):
f = io.BytesIO()
to_fn(f)
f.seek(0)
df_1 = from_fn(f, columns=["b", "c"], use_pyarrow=False)
assert df_1.frame_equal(expected)


def test_read_web_file():
url = "https://raw.githubusercontent.com/pola-rs/polars/master/examples/aggregate_multiple_files_in_chunks/datasets/foods1.csv"
df = pl.read_csv(url)
Expand Down