diff --git a/docs/source/python/integration.rst b/docs/source/python/integration.rst index 997bc52102f..1c05b9f3e19 100644 --- a/docs/source/python/integration.rst +++ b/docs/source/python/integration.rst @@ -38,3 +38,4 @@ This allows to easily integrate PyArrow with other languages and technologies. integration/python_java integration/extending integration/cuda + integration/dataset diff --git a/docs/source/python/integration/dataset.rst b/docs/source/python/integration/dataset.rst new file mode 100644 index 00000000000..4d12b89896a --- /dev/null +++ b/docs/source/python/integration/dataset.rst @@ -0,0 +1,124 @@ +.. Licensed to the Apache Software Foundation (ASF) under one +.. or more contributor license agreements. See the NOTICE file +.. distributed with this work for additional information +.. regarding copyright ownership. The ASF licenses this file +.. to you under the Apache License, Version 2.0 (the +.. "License"); you may not use this file except in compliance +.. with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, +.. software distributed under the License is distributed on an +.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +.. KIND, either express or implied. See the License for the +.. specific language governing permissions and limitations +.. under the License. + +Extending PyArrow Datasets +========================== + +.. warn:: + + This protocol is currently experimental. + +PyArrow provides a core protocol for datasets, so third-party libraries can both +produce and consume classes that conform to useful subset of the PyArrow dataset +API. This subset provides enough functionality to provide projection +pushdown. The subset of the API is contained in ``pyarrow.dataset.protocol``. + +.. image:: pyarrow_dataset_protocol.svg + :alt: A diagram showing the workflow for using the PyArrow Dataset protocol. + There are two flows shown, one for stream and one for tasks. The stream + case shows a linear flow from a producer class, to a dataset, to a + scanner, and finally to a RecordBatchReader. The tasks case shows a + similar diagram, except the dataset is split into fragments, which are + then distributed to tasks, which each create their own scanner and + RecordBatchReader. + +Producers are responsible for outputting a class that conforms to the protocol. + +Consumers are responsible for calling methods on the protocol to get the data +out of the dataset. The protocol supports getting data as a single stream or +as a series of tasks which may be distributed. + +As an example, from the perspective of the user this is what the code looks like +to retrieve a Delta Lake table as a dataset and use it in DuckDB: + +.. code-block:: python + :emphasize-lines: 2,6 + + from deltalake import DeltaTable + table = DeltaTable("path/to/table") + dataset = table.to_pyarrow_dataset() + + import duckdb + df = duckdb.arrow(dataset) + df.project("y") + +Here, the DuckDB would pass the the projection of ``y`` down to the producer +through the dataset protocol. The deltalake scanner would then only read the +column ``y``. Thus, the user gets to enjoy the performance benefits of pushing +down projections while being able to specify those in their preferred query engine. + + +Dataset Producers +----------------- + +If you are a library implementing a new data source, you'll want to be able to +produce a PyArrow-compatible dataset. Your dataset could be backed by the classes +implemented in PyArrow or you could implement your own classes. Either way, you +should implement the protocol below. + +Dataset Consumers +----------------- + +If you are a query engine, you'll want to be able to +consume any PyArrow datasets. To make sure your integration is compatible +with any dataset, you should only call methods that are included in the +protocol. Dataset implementations provided by PyArrow implements additional +options and methods beyond those, but they should not be relied upon without +checking for specific classes. + +There are two general patterns for consuming PyArrow datasets: reading a single +stream or creating a scan task per fragment. + +If you have a streaming execution model, you can receive a single stream +of data by calling ``dataset.scanner(columns=...).to_reader()``. +This will return a RecordBatchReader, which can be exported over the +:ref:`C Stream Interface `. The record batches yield +from the stream can then be passed to worker threads for parallelism. + +If you are using a task-based model, you can split the dataset into fragments +and then distribute those fragments into tasks that create their own scanners +and readers. In this case, the code looks more like: + +.. code-block:: python + + fragments = list(dataset.get_fragments(columns=...)) + + def scan_partition(i): + fragment = fragments[i] + scanner = fragment.scanner() + return reader = scanner.to_reader() + +Fragments are pickleable, so they can be passed to remote workers in a +distributed system. + +If your engine supports projection (column) pushdown, +you can pass those down to the dataset by passing them to the ``scanner``. +Column pushdown is limited to selecting a subset of columns from the schema. +Some implementations, including PyArrow may also support projecting and +renaming columns, but this is not part of the protocol. + + +The protocol +------------ + +This module can be imported starting in PyArrow ``13.0.0`` at +``pyarrow.dataset.protocol``. The protocol is defined with ``typing.Protocol`` +classes. They can be checked at runtime with ``isinstance`` but can also be +checked statically with Python type checkers like ``mypy``. + +.. literalinclude:: ../../../../python/pyarrow/dataset/protocol.py + :language: python diff --git a/docs/source/python/integration/pyarrow_dataset_protocol.svg b/docs/source/python/integration/pyarrow_dataset_protocol.svg new file mode 100644 index 00000000000..7b6e464ca69 --- /dev/null +++ b/docs/source/python/integration/pyarrow_dataset_protocol.svg @@ -0,0 +1,4 @@ + + + +
Consumer
Consumer
Producer
Producer
Stream
Stream
Tasks
Tasks
PyArrow Dataset Protocol
PyArrow Dataset Protocol
Consumer
Consumer
Producer
Producer
Producer class
Producer class
Dataset
Dataset
Scanner
Scanner
RecordBatchReader
RecordBatchReader
Producer class
Producer class
Dataset
Dataset
Scanner
Scanner
RecordBatchReader
RecordBatchReader
Fragment
Fragment
Scanner
Scanner
RecordBatchReader
RecordBatchReader
Fragment
Fragment
Scanner
Scanner
RecordBatchReader
RecordBatchReader
Fragment
Fragment
Text is not SVG - cannot display
\ No newline at end of file diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset/__init__.py similarity index 100% rename from python/pyarrow/dataset.py rename to python/pyarrow/dataset/__init__.py diff --git a/python/pyarrow/dataset/protocol.py b/python/pyarrow/dataset/protocol.py new file mode 100644 index 00000000000..d8696507ba0 --- /dev/null +++ b/python/pyarrow/dataset/protocol.py @@ -0,0 +1,158 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Protocol definitions for pyarrow.dataset + +These provide the abstract interface for a dataset. Other libraries may implement +this interface to expose their data, without having to extend PyArrow's classes. + +Applications and libraries that want to consume datasets should accept datasets +that implement these protocols, rather than requiring the specific +PyArrow classes. + +The pyarrow.dataset.Dataset class itself implements this protocol. + +See Extending PyArrow Datasets for more information: + +https://arrow.apache.org/docs/python/integration/dataset.html +""" +from abc import abstractmethod +from typing import Iterator, List, Optional + +# TODO: remove once we drop support for Python 3.7 +if sys.version_info >= (3, 8): + from typing import Protocol, runtime_checkable +else: + from typing_extensions import Protocol, runtime_checkable + +from pyarrow.dataset import Expression +from pyarrow import Table, RecordBatchReader, Schema + + +@runtime_checkable +class Scanner(Protocol): + """ + A scanner implementation for a dataset. + + This may be a scan of a whole dataset, or a scan of a single fragment. + """ + @abstractmethod + def count_rows(self) -> int: + """ + Count the number of rows in this dataset or fragment. + + Implementors may provide optimized code paths that compute this from metadata. + + Returns + ------- + int + The number of rows in the dataset or fragment. + """ + ... + + @abstractmethod + def head(self, num_rows: int) -> Table: + """ + Get the first ``num_rows`` rows of the dataset or fragment. + + Parameters + ---------- + num_rows : int + The number of rows to return. + + Returns + ------- + Table + A table containing the first ``num_rows`` rows of the dataset or fragment. + """ + ... + + @abstractmethod + def to_reader(self) -> RecordBatchReader: + """ + Create a Record Batch Reader for this scan. + + This is used to read the data in chunks. + + Returns + ------- + RecordBatchReader + """ + ... + + +@runtime_checkable +class Scannable(Protocol): + @abstractmethod + def scanner(self, columns: Optional[List[str]] = None, + batch_size: Optional[int] = None, + use_threads: bool = True, + **kwargs) -> Scanner: + """Create a scanner for this dataset. + + Parameters + ---------- + columns : List[str], optional + Names of columns to include in the scan. If None, all columns are + included. + batch_size : int, optional + The number of rows to include in each batch. If None, the default + value is used. The default value is implementation specific. + use_threads : bool, default True + Whether to use multiple threads to read the rows. Often consumers + reading a whole dataset in one scanner will keep this + as True, while consumers reading a single fragment per worker will + set this to False. + """ + ... + + +@runtime_checkable +class Fragment(Scannable, Protocol): + """A fragment of a dataset. + + This might be a partition, a file, a file chunk, etc. + + This class should be pickleable so that it can be used in a distributed scan.""" + ... + + +@runtime_checkable +class Dataset(Scannable, Protocol): + @abstractmethod + def get_fragments( + self, **kwargs + ) -> Iterator[Fragment]: + """Get the fragments of this dataset. + + Parameters + ---------- + **kwargs : dict + Additional arguments to pass to underlying implementation. + """ + ... + + @property + @abstractmethod + def schema(self) -> Schema: + """ + Get the schema of this dataset. + + Returns + ------- + Schema + """ + ... diff --git a/python/pyarrow/tests/test_dataset_protocol.py b/python/pyarrow/tests/test_dataset_protocol.py new file mode 100644 index 00000000000..f2567415492 --- /dev/null +++ b/python/pyarrow/tests/test_dataset_protocol.py @@ -0,0 +1,29 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Test that PyArrow datasets conform to the protocol.""" +import pyarrow.dataset.protocol as protocol +import pyarrow.dataset as ds + + +def test_dataset_protocol(): + assert isinstance(ds.Dataset, protocol.Dataset) + assert isinstance(ds.Fragment, protocol.Fragment) + + assert isinstance(ds.Dataset, protocol.Scannable) + assert isinstance(ds.Fragment, protocol.Scannable) + + assert isinstance(ds.Scanner, protocol.Scanner)