Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/source/python/integration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,4 @@ This allows to easily integrate PyArrow with other languages and technologies.
integration/python_java
integration/extending
integration/cuda
integration/dataset
124 changes: 124 additions & 0 deletions docs/source/python/integration/dataset.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
.. Licensed to the Apache Software Foundation (ASF) under one
.. or more contributor license agreements. See the NOTICE file
.. distributed with this work for additional information
.. regarding copyright ownership. The ASF licenses this file
.. to you under the Apache License, Version 2.0 (the
.. "License"); you may not use this file except in compliance
.. with the License. You may obtain a copy of the License at

.. http://www.apache.org/licenses/LICENSE-2.0

.. Unless required by applicable law or agreed to in writing,
.. software distributed under the License is distributed on an
.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
.. KIND, either express or implied. See the License for the
.. specific language governing permissions and limitations
.. under the License.

Extending PyArrow Datasets
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we say that this is currently experimental, and list the things that we know are on the roadmap?

==========================

.. warn::

This protocol is currently experimental.

PyArrow provides a core protocol for datasets, so third-party libraries can both
produce and consume classes that conform to useful subset of the PyArrow dataset
API. This subset provides enough functionality to provide projection
pushdown. The subset of the API is contained in ``pyarrow.dataset.protocol``.

Comment on lines +28 to +29
Copy link
Member Author

@wjones127 wjones127 Jun 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pushdown. The subset of the API is contained in ``pyarrow.dataset.protocol``.
pushdown. The subset of the API is contained in ``pyarrow.dataset.protocol``.
Producers are scanner implementations. For example, table formats like Delta
Lake and Iceberg might provide their own dataset implementations. Consumers
are typically query engines, such as DuckDB, DataFusion, Polars, and Dask.
Providing a common API avoids a situation where supporting ``N`` dataset
formats in ``M`` query engines requires ``N * M`` integrations.

.. image:: pyarrow_dataset_protocol.svg
:alt: A diagram showing the workflow for using the PyArrow Dataset protocol.
There are two flows shown, one for stream and one for tasks. The stream
case shows a linear flow from a producer class, to a dataset, to a
scanner, and finally to a RecordBatchReader. The tasks case shows a
similar diagram, except the dataset is split into fragments, which are
then distributed to tasks, which each create their own scanner and
RecordBatchReader.

Producers are responsible for outputting a class that conforms to the protocol.

Consumers are responsible for calling methods on the protocol to get the data
out of the dataset. The protocol supports getting data as a single stream or
as a series of tasks which may be distributed.
Comment on lines +39 to +43
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is "ok" but the definition of producer and consumer here are reversed from what they are in Substrait which confused me for a while. Maybe we can go with "Data producer" and "Data consumer"?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ha it finally clicked why I myself find these terms confusing 🤣. It feels backwards! I'll think of new names.


As an example, from the perspective of the user this is what the code looks like
to retrieve a Delta Lake table as a dataset and use it in DuckDB:

.. code-block:: python
:emphasize-lines: 2,6

from deltalake import DeltaTable
table = DeltaTable("path/to/table")
dataset = table.to_pyarrow_dataset()

import duckdb
df = duckdb.arrow(dataset)
df.project("y")

Here, the DuckDB would pass the the projection of ``y`` down to the producer
through the dataset protocol. The deltalake scanner would then only read the
column ``y``. Thus, the user gets to enjoy the performance benefits of pushing
down projections while being able to specify those in their preferred query engine.


Dataset Producers
-----------------

If you are a library implementing a new data source, you'll want to be able to
produce a PyArrow-compatible dataset. Your dataset could be backed by the classes
implemented in PyArrow or you could implement your own classes. Either way, you
should implement the protocol below.

Dataset Consumers
-----------------

If you are a query engine, you'll want to be able to
consume any PyArrow datasets. To make sure your integration is compatible
with any dataset, you should only call methods that are included in the
protocol. Dataset implementations provided by PyArrow implements additional
options and methods beyond those, but they should not be relied upon without
checking for specific classes.

There are two general patterns for consuming PyArrow datasets: reading a single
stream or creating a scan task per fragment.

If you have a streaming execution model, you can receive a single stream
of data by calling ``dataset.scanner(columns=...).to_reader()``.
This will return a RecordBatchReader, which can be exported over the
:ref:`C Stream Interface <c-stream-interface>`. The record batches yield
from the stream can then be passed to worker threads for parallelism.

If you are using a task-based model, you can split the dataset into fragments
and then distribute those fragments into tasks that create their own scanners
and readers. In this case, the code looks more like:

.. code-block:: python

fragments = list(dataset.get_fragments(columns=...))

def scan_partition(i):
fragment = fragments[i]
scanner = fragment.scanner()
return reader = scanner.to_reader()

Fragments are pickleable, so they can be passed to remote workers in a
distributed system.

If your engine supports projection (column) pushdown,
you can pass those down to the dataset by passing them to the ``scanner``.
Column pushdown is limited to selecting a subset of columns from the schema.
Some implementations, including PyArrow may also support projecting and
renaming columns, but this is not part of the protocol.


The protocol
------------

This module can be imported starting in PyArrow ``13.0.0`` at
``pyarrow.dataset.protocol``. The protocol is defined with ``typing.Protocol``
classes. They can be checked at runtime with ``isinstance`` but can also be
checked statically with Python type checkers like ``mypy``.

.. literalinclude:: ../../../../python/pyarrow/dataset/protocol.py
:language: python
4 changes: 4 additions & 0 deletions docs/source/python/integration/pyarrow_dataset_protocol.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
File renamed without changes.
158 changes: 158 additions & 0 deletions python/pyarrow/dataset/protocol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Protocol definitions for pyarrow.dataset

These provide the abstract interface for a dataset. Other libraries may implement
this interface to expose their data, without having to extend PyArrow's classes.

Applications and libraries that want to consume datasets should accept datasets
that implement these protocols, rather than requiring the specific
PyArrow classes.

The pyarrow.dataset.Dataset class itself implements this protocol.

See Extending PyArrow Datasets for more information:

https://arrow.apache.org/docs/python/integration/dataset.html
"""
from abc import abstractmethod
from typing import Iterator, List, Optional

# TODO: remove once we drop support for Python 3.7
if sys.version_info >= (3, 8):
from typing import Protocol, runtime_checkable
else:
from typing_extensions import Protocol, runtime_checkable

from pyarrow.dataset import Expression
from pyarrow import Table, RecordBatchReader, Schema


@runtime_checkable
class Scanner(Protocol):
"""
A scanner implementation for a dataset.

This may be a scan of a whole dataset, or a scan of a single fragment.
"""
@abstractmethod
def count_rows(self) -> int:
"""
Count the number of rows in this dataset or fragment.

Implementors may provide optimized code paths that compute this from metadata.

Returns
-------
int
The number of rows in the dataset or fragment.
"""
...

@abstractmethod
def head(self, num_rows: int) -> Table:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For head and count_rows, should we say that not all producers will support them and that they may raise NotImplementedError?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's reasonable. They aren't that important to the protocol's goals; I almost considered removing them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would leave them in. In Iceberg we can do a count without touching the actual data files (if you don't use a filter).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving them in.

"""
Get the first ``num_rows`` rows of the dataset or fragment.

Parameters
----------
num_rows : int
The number of rows to return.

Returns
-------
Table
A table containing the first ``num_rows`` rows of the dataset or fragment.
"""
...

@abstractmethod
def to_reader(self) -> RecordBatchReader:
"""
Create a Record Batch Reader for this scan.

This is used to read the data in chunks.

Returns
-------
RecordBatchReader
"""
...


@runtime_checkable
class Scannable(Protocol):
@abstractmethod
def scanner(self, columns: Optional[List[str]] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def scanner(self, columns: Optional[List[str]] = None,
def scanner(self, columns: Optional[Tuple[str, ...]] = None,

Nit, I prefer to use Tuples over Lists because:

  • They are immutable
  • And therefore also hash-able:
>>> hash((1,2,3))
529344067295497451
>>> hash([1,2,3])
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: unhashable type: 'list'

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't disagree in principle, but also trying to keep this somewhat compatible. Though maybe we can loosen to Sequence[str]?

Actually, it currently supports List[str] | Dict[str, Expression]. Do we want to support the dictionary generally? or keep the protocol more narrow than that?

if columns is not None:
if isinstance(columns, dict):
for expr in columns.values():
if not isinstance(expr, Expression):
raise TypeError(
"Expected an Expression for a 'column' dictionary "
"value, got {} instead".format(type(expr))
)
c_exprs.push_back((<Expression> expr).unwrap())
check_status(
builder.Project(c_exprs, [tobytes(c) for c in columns.keys()])
)
elif isinstance(columns, list):
check_status(builder.ProjectColumns([tobytes(c) for c in columns]))
else:
raise ValueError(
"Expected a list or a dict for 'columns', "
"got {} instead.".format(type(columns))
)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The difference between Sequence[str] and Dict[str, Expression] is significant. The former only allows you to pick which columns to load. The latter introduces the concept of project expressions which is big.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related to #35568 (comment), we might want to only allow Sequence[str] here so we only support column selection and reordering. That way we don't need to require consumers to perform projections. I don't think any existing consumer relies on this.

batch_size: Optional[int] = None,
use_threads: bool = True,
**kwargs) -> Scanner:
"""Create a scanner for this dataset.

Parameters
----------
columns : List[str], optional
Names of columns to include in the scan. If None, all columns are
included.
batch_size : int, optional
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should batch_size be part of to_reader instead of scanner?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm open to that. But I don't want to add that to the protocol without also implementing it in PyArrow Datasets. So if we think this is important, I'll remove this for now.

The number of rows to include in each batch. If None, the default
value is used. The default value is implementation specific.
Comment on lines +111 to +113
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe expand that the default value is not only implementation specific but might not even be consistent between batches?

Also, if this is a min/max or is it a maximum-only? In other words, if batch_size is is 1_000_000 and the source is a parquet file with 10 row groups of 100_000 rows does the scanner need to accumulate the rows or is it acceptable to return smaller batches?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good detail to think about. I don't think we should require it to be exact; for example, if the reader can't read in exactly that batch size I don't think it should error. But I do think readers should make their best effort to be close the batch size as possible, even if that means splitting row groups into chunks, for example.

Though you might have more informed opinions here; what do you think is reasonable to expect here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This parameter has lost importance in arrow-c++ datasets. It used to be an important tuning parameter that affected the size of the batches used internally by the C++ implementation. However, it didn't make sense for the user to pick the correct value (and there are multiple batch sizes in the C++ and the right value might even depend on the schema and be quite difficult to calculate).

I think it still has value, especially "max batch size". The user needs someway to say "don't give me 20GB of data all at once".

So I think it needs to be a hard upper limit but it can be a soft lower limit. We could either call it max_batch_size (and ignore it as an upper limit entirely) or preferred_batch_size (and explain that only the upper limit is strictly enforced). I don't think using this as an upper limit is overly burdensome as slicing tables/batches should be pretty easy and lightweight. The reverse (concatenating batches) is more complicated and expensive.

use_threads : bool, default True
Whether to use multiple threads to read the rows. Often consumers
reading a whole dataset in one scanner will keep this
as True, while consumers reading a single fragment per worker will
set this to False.
"""
...


@runtime_checkable
class Fragment(Scannable, Protocol):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a some things I would like to have here, as a user, but I understand we are just getting started and trying to be minimal. So take these as suggestions:

__repr__ <-- converting a fragment to string is very useful for debugging

estimated_cost <-- I get why this one isn't there but a fragment might be 5 rows or it might be 5 million rows, and that could be valuable for figuring out how to distribute a dataset workload. Still, there is no universal way of estimating cost, so perhaps we can leave this for an extension.

"""A fragment of a dataset.

This might be a partition, a file, a file chunk, etc.

This class should be pickleable so that it can be used in a distributed scan."""
...


@runtime_checkable
class Dataset(Scannable, Protocol):
@abstractmethod
def get_fragments(
self, **kwargs
) -> Iterator[Fragment]:
"""Get the fragments of this dataset.

Parameters
----------
**kwargs : dict
Additional arguments to pass to underlying implementation.
"""
...

@property
@abstractmethod
def schema(self) -> Schema:
"""
Get the schema of this dataset.

Returns
-------
Schema
"""
...
29 changes: 29 additions & 0 deletions python/pyarrow/tests/test_dataset_protocol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Test that PyArrow datasets conform to the protocol."""
import pyarrow.dataset.protocol as protocol
import pyarrow.dataset as ds


def test_dataset_protocol():
assert isinstance(ds.Dataset, protocol.Dataset)
assert isinstance(ds.Fragment, protocol.Fragment)

assert isinstance(ds.Dataset, protocol.Scannable)
assert isinstance(ds.Fragment, protocol.Scannable)

assert isinstance(ds.Scanner, protocol.Scanner)