Skip to content

Commit be648e2

Browse files
committed
remove filters for now
1 parent 0f8a61c commit be648e2

File tree

2 files changed

+37
-69
lines changed

2 files changed

+37
-69
lines changed

docs/source/python/integration/dataset.rst

Lines changed: 23 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,13 @@
1818
Extending PyArrow Datasets
1919
==========================
2020

21+
.. warn::
22+
23+
This protocol is currently experimental.
24+
2125
PyArrow provides a core protocol for datasets, so third-party libraries can both
2226
produce and consume classes that conform to useful subset of the PyArrow dataset
23-
API. This subset provides enough functionality to provide predicate and filter
27+
API. This subset provides enough functionality to provide projection
2428
pushdown. The subset of the API is contained in ``pyarrow.dataset.protocol``.
2529

2630
.. image:: pyarrow_dataset_protocol.svg
@@ -38,18 +42,24 @@ Consumers are responsible for calling methods on the protocol to get the data
3842
out of the dataset. The protocol supports getting data as a single stream or
3943
as a series of tasks which may be distributed.
4044

41-
From the perspective of a user, the code looks like:
45+
As an example, from the perspective of the user this is what the code looks like
46+
to retrieve a Delta Lake table as a dataset and use it in DuckDB:
4247

4348
.. code-block:: python
49+
:emphasize-lines: 2,6
50+
51+
from deltalake import DeltaTable
52+
table = DeltaTable("path/to/table")
53+
dataset = table.to_pyarrow_dataset()
4454
45-
dataset = producer_library.get_dataset(...)
46-
df = consumer_library.read_dataset(dataset)
47-
df.filter("x > 0").select("y")
55+
import duckdb
56+
df = duckdb.arrow(dataset)
57+
df.project("y")
4858
49-
Here, the consumer would pass the filter ``x > 0`` and the projection of ``y`` down
50-
to the producer through the dataset protocol. Thus, the user gets to enjoy the
51-
performance benefits of pushing down filters and projections while being able
52-
to specify those in their preferred query engine.
59+
Here, the DuckDB would pass the the projection of ``y`` down to the producer
60+
through the dataset protocol. The deltalake scanner would then only read the
61+
column ``y``. Thus, the user gets to enjoy the performance benefits of pushing
62+
down projections while being able to specify those in their preferred query engine.
5363

5464

5565
Dataset Producers
@@ -60,24 +70,6 @@ produce a PyArrow-compatible dataset. Your dataset could be backed by the classe
6070
implemented in PyArrow or you could implement your own classes. Either way, you
6171
should implement the protocol below.
6272

63-
When implementing the dataset, consider the following:
64-
65-
* Filters passed down should be fully executed. While other systems have scanners
66-
that are "best-effort", only executing the parts of the filter that it can, PyArrow
67-
datasets should always remove all rows that don't match the filter. If the
68-
implementation cannot execute the filter, it should raise an exception. A
69-
limited set of expressions are allowed in these filters for the general
70-
protocol. See the docstrings for ``Scannable`` below for details.
71-
* The API does not require that a dataset has metadata about all fragments
72-
loaded into memory. Indeed, to scale to very large Datasets, don't eagerly
73-
load all the fragment metadata into memory. Instead, load fragment metadata
74-
once a filter is passed. This allows you to skip loading metadata about
75-
fragments that aren't relevant to queries. For example, if you have a dataset
76-
that uses Hive-style paritioning for a column ``date`` and the user passes a
77-
filter for ``date=2023-01-01``, then you can skip listing directory for HIVE
78-
partitions that don't match that date.
79-
80-
8173
Dataset Consumers
8274
-----------------
8375

@@ -92,7 +84,7 @@ There are two general patterns for consuming PyArrow datasets: reading a single
9284
stream or creating a scan task per fragment.
9385

9486
If you have a streaming execution model, you can receive a single stream
95-
of data by calling ``dataset.scanner(filter=..., columns=...).to_reader()``.
87+
of data by calling ``dataset.scanner(columns=...).to_reader()``.
9688
This will return a RecordBatchReader, which can be exported over the
9789
:ref:`C Stream Interface <c-stream-interface>`. The record batches yield
9890
from the stream can then be passed to worker threads for parallelism.
@@ -103,7 +95,7 @@ and readers. In this case, the code looks more like:
10395

10496
.. code-block:: python
10597
106-
fragments = list(dataset.get_fragments(filter=..., columns=...))
98+
fragments = list(dataset.get_fragments(columns=...))
10799
108100
def scan_partition(i):
109101
fragment = fragments[i]
@@ -113,13 +105,11 @@ and readers. In this case, the code looks more like:
113105
Fragments are pickleable, so they can be passed to remote workers in a
114106
distributed system.
115107

116-
If your engine supports predicate (filter) and projection (column) pushdown,
108+
If your engine supports projection (column) pushdown,
117109
you can pass those down to the dataset by passing them to the ``scanner``.
118110
Column pushdown is limited to selecting a subset of columns from the schema.
119111
Some implementations, including PyArrow may also support projecting and
120-
renaming columns, but this is not part of the protocol. Predicate pushdown
121-
is limited to a subset of expressions. See the docstrings for ``Scannable``
122-
for the allowed expressions.
112+
renaming columns, but this is not part of the protocol.
123113

124114

125115
The protocol

python/pyarrow/dataset/protocol.py

Lines changed: 14 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,13 @@
2323
that implement these protocols, rather than requiring the specific
2424
PyArrow classes.
2525
26+
The pyarrow.dataset.Dataset class itself implements this protocol.
27+
2628
See Extending PyArrow Datasets for more information:
2729
2830
https://arrow.apache.org/docs/python/integration/dataset.html
2931
"""
30-
from abc import abstractmethod, abstractproperty
32+
from abc import abstractmethod
3133
from typing import Iterator, List, Optional
3234

3335
# TODO: remove once we drop support for Python 3.7
@@ -50,21 +52,21 @@ class Scanner(Protocol):
5052
@abstractmethod
5153
def count_rows(self) -> int:
5254
"""
53-
Count the number of rows in this dataset.
55+
Count the number of rows in this dataset or fragment.
5456
5557
Implementors may provide optimized code paths that compute this from metadata.
5658
5759
Returns
5860
-------
5961
int
60-
The number of rows in the dataset.
62+
The number of rows in the dataset or fragment.
6163
"""
6264
...
6365

6466
@abstractmethod
6567
def head(self, num_rows: int) -> Table:
6668
"""
67-
Get the first ``num_rows`` rows of the dataset.
69+
Get the first ``num_rows`` rows of the dataset or fragment.
6870
6971
Parameters
7072
----------
@@ -74,7 +76,7 @@ def head(self, num_rows: int) -> Table:
7476
Returns
7577
-------
7678
Table
77-
A table containing the first ``num_rows`` rows of the dataset.
79+
A table containing the first ``num_rows`` rows of the dataset or fragment.
7880
"""
7981
...
8082

@@ -96,7 +98,7 @@ def to_reader(self) -> RecordBatchReader:
9698
class Scannable(Protocol):
9799
@abstractmethod
98100
def scanner(self, columns: Optional[List[str]] = None,
99-
filter: Optional[Expression] = None, batch_size: Optional[int] = None,
101+
batch_size: Optional[int] = None,
100102
use_threads: bool = True,
101103
**kwargs) -> Scanner:
102104
"""Create a scanner for this dataset.
@@ -106,33 +108,14 @@ def scanner(self, columns: Optional[List[str]] = None,
106108
columns : List[str], optional
107109
Names of columns to include in the scan. If None, all columns are
108110
included.
109-
filter : Expression, optional
110-
Filter expression to apply to the scan. If None, no filter is applied.
111111
batch_size : int, optional
112112
The number of rows to include in each batch. If None, the default
113113
value is used. The default value is implementation specific.
114114
use_threads : bool, default True
115-
Whether to use multiple threads to read the rows. It is expected
116-
that consumers reading a whole dataset in one scanner will keep this
115+
Whether to use multiple threads to read the rows. Often consumers
116+
reading a whole dataset in one scanner will keep this
117117
as True, while consumers reading a single fragment per worker will
118-
typically set this to False.
119-
120-
Notes
121-
-----
122-
The filters must be fully satisfied. If the dataset cannot satisfy the
123-
filter, it should raise an error.
124-
125-
Only the following expressions are allowed in the filter:
126-
- Equality / inequalities (==, !=, <, >, <=, >=)
127-
- Conjunctions (and, or)
128-
- Field references (e.g. "a" or "a.b.c")
129-
- Literals (e.g. 1, 1.0, "a", True)
130-
- cast
131-
- is_null / not_null
132-
- isin
133-
- between
134-
- negation (not)
135-
118+
set this to False.
136119
"""
137120
...
138121

@@ -151,24 +134,19 @@ class Fragment(Scannable, Protocol):
151134
class Dataset(Scannable, Protocol):
152135
@abstractmethod
153136
def get_fragments(
154-
self,
155-
filter: Optional[Expression] = None, **kwargs
137+
self, **kwargs
156138
) -> Iterator[Fragment]:
157139
"""Get the fragments of this dataset.
158140
159141
Parameters
160142
----------
161-
filter : Expression, optional
162-
Filter expression to use to prune which fragments are selected.
163-
See Scannable.scanner for details on allowed filters. The filter is
164-
just used to prune which fragments are selected. It does not need to
165-
save the filter to apply to the scan. That is handled by the scanner.
166143
**kwargs : dict
167144
Additional arguments to pass to underlying implementation.
168145
"""
169146
...
170147

171-
@abstractproperty
148+
@property
149+
@abstractmethod
172150
def schema(self) -> Schema:
173151
"""
174152
Get the schema of this dataset.

0 commit comments

Comments
 (0)