Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(python): Add Arrow->Python datetime support #417

Merged
merged 24 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ license = {text = "Apache-2.0"}
requires-python = ">=3.8"

[project.optional-dependencies]
test = ["pyarrow", "pytest", "numpy"]
verify = ["pytest", "numpy"]
test = ["pyarrow", "python-dateutil", "pytest", "numpy"]
verify = ["python-dateutil", "pytest", "numpy"]

[project.urls]
Homepage = "https://arrow.apache.org"
Expand Down
186 changes: 184 additions & 2 deletions python/src/nanoarrow/iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.

import warnings
from functools import cached_property
from itertools import islice
from typing import Iterable, Tuple
Expand Down Expand Up @@ -83,6 +84,14 @@ def iter_tuples(obj, schema=None) -> Iterable[Tuple]:
return RowTupleIterator.get_iterator(obj, schema=schema)


class InvalidArrayWarning(UserWarning):
pass


class LossyConversionWarning(UserWarning):
pass


class ArrayViewIterator:
"""Base class for iterators that use an internal ArrowArrayView
as the basis for conversion to Python objects. Intended for internal use.
Expand All @@ -101,7 +110,7 @@ def __init__(self, schema, *, _array_view=None):
map(self._make_child, self._schema.children, self._array_view.children)
)

if schema.dictionary is None:
if self._schema.dictionary is None:
self._dictionary = None
else:
self._dictionary = self._make_child(
Expand All @@ -115,6 +124,13 @@ def _make_child(self, schema, array_view):
def _child_names(self):
return [child.name for child in self._schema.children]

@cached_property
def _object_label(self):
if self._schema.name:
return f"{self._schema.name} <{self._schema_view.type}>"
else:
return f"<unnamed {self._schema_view.type}>"

def _contains_nulls(self):
return (
self._schema_view.nullable
Expand All @@ -126,6 +142,9 @@ def _set_array(self, array):
self._array_view._set_array(array)
return self

def _warn(self, message, category):
warnings.warn(f"{self._object_label}: {message}", category)


class PyIterator(ArrayViewIterator):
"""Iterate over the Python object version of values in an ArrowArrayView.
Expand Down Expand Up @@ -244,6 +263,125 @@ def _binary_iter(self, offset, length):
for start, end in zip(starts, ends):
yield bytes(data[start:end])

def _date_iter(self, offset, length):
from datetime import date, timedelta

storage = self._primitive_iter(offset, length)
epoch = date(1970, 1, 1)

if self._schema_view.type_id == CArrowType.DATE32:
for item in storage:
if item is None:
yield item
else:
yield epoch + timedelta(item)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

datetime.date also has a fromtimestamp method that accepts seconds since epoch, so you could also use that, which seems to be slightly faster:

In [53]: item = 10957

In [54]: epoch = datetime.date(1970, 1, 1)

In [55]: epoch + datetime.timedelta(item)
Out[55]: datetime.date(2000, 1, 1)

In [56]: datetime.date.fromtimestamp(60*60*24*item)
Out[56]: datetime.date(2000, 1, 1)

In [57]: %timeit epoch + datetime.timedelta(item)
164 ns ± 0.569 ns per loop (mean ± std. dev. of 7 runs, 10,000,000 loops each)

In [58]: %timeit datetime.date.fromtimestamp(60*60*24*item)
118 ns ± 0.0869 ns per loop (mean ± std. dev. of 7 runs, 10,000,000 loops each)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And the same for DATE64 if converting the milliseconds to seconds

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! I do see in the Python documentation ( https://docs.python.org/3/library/datetime.html#datetime.date.fromtimestamp )

It’s common for this to be restricted to years from 1970 through 2038

...which is ominous (if up-to-date, which it might not be anymore).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, good point. But this is true for datetime.datetime.fromtimestamp as well, though ..

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed this to avoid fromtimestamp() for now in both cases (even though it's quite a bit slower) and added some dates that will make a future test fail if this is actually a problem. Optimizing these conversions is probably a good project for some future (less busy) time.

else:
for item in storage:
if item is None:
yield item
else:
yield epoch + timedelta(milliseconds=item)

def _time_iter(self, offset, length):
from datetime import time

for item in self._iter_time_components(offset, length):
if item is None:
yield None
else:
days, hours, mins, secs, us = item
if days != 0:
self._warn("days != 0", InvalidArrayWarning)

yield time(hours, mins, secs, us)

def _timestamp_iter(self, offset, length):
from datetime import datetime

epoch = datetime(1970, 1, 1, tzinfo=_get_tzinfo("UTC"))
parent = self._duration_iter(offset, length)

tz = self._schema_view.timezone
if tz:
tz = _get_tzinfo(tz)

for item in parent:
if item is None:
yield None
else:
yield (epoch + item).astimezone(tz)
else:
for item in parent:
if item is None:
yield None
else:
yield (epoch + item).replace(tzinfo=None)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for item in parent:
if item is None:
yield None
else:
yield (epoch + item).replace(tzinfo=None)
epoch = epoch.replace(tzinfo=None)
for item in parent:
if item is None:
yield None
else:
yield epoch + item

No need to do the replace each time inside the for loop I think?


def _duration_iter(self, offset, length):
from datetime import timedelta

storage = self._primitive_iter(offset, length)

unit = self._schema_view.time_unit
if unit == "s":
to_us = 1_000_000
Comment on lines +327 to +328
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For unit of seconds, it's probably a bit more efficient to pass the seconds directly to timedelta(seconds=..) than first converting to microseconds, because then the timedelta constructor will then convert the microseconds back to seconds internally.

Don't know if that is worth it though, because of course the current logic makes the loop a bit simpler

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a lot that could be optimized here...this pass is mostly for completeness/correctness. Probably this is a job for C or C++ + and Python C API where we can do some of these things efficiently.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a lot that could be optimized here...this pass is mostly for completeness/correctness. Probably this is a job for C or C++ + and Python C API where we can do some of these things efficiently.

FWIW, I think it is also nice that this is just in Python (and it's still faster than pyarrow to_pylist anyway). But it's true the bigger gain will probably be found in moving this to C(ython) (at least if we use numpy as baseline, then this specific duration iteration can be improved 10x: this PR for 1M elements: 340ms, pyarrow: 480 ms, this PR but with directly passing seconds: 280ms, numpy: 30ms)

elif unit == "ms":
to_us = 1000
elif unit == "us":
to_us = 1
elif unit == "ns":
storage = self._iter_us_from_ns(storage)
to_us = 1

for item in storage:
if item is None:
yield None
else:
yield timedelta(microseconds=item * to_us)

def _iter_time_components(self, offset, length):
storage = self._primitive_iter(offset, length)

unit = self._schema_view.time_unit
if unit == "s":
to_us = 1_000_000
elif unit == "ms":
to_us = 1000
elif unit == "us":
to_us = 1
elif unit == "ns":
storage = self._iter_us_from_ns(storage)
to_us = 1

us_per_sec = 1_000_000
us_per_min = us_per_sec * 60
us_per_hour = us_per_min * 60
us_per_day = us_per_hour * 24

for item in storage:
if item is None:
yield None
else:
us = item * to_us
days = us // us_per_day
us = us % us_per_day
hours = us // us_per_hour
us = us % us_per_hour
mins = us // us_per_min
us = us % us_per_min
secs = us // us_per_sec
us = us % us_per_sec
yield days, hours, mins, secs, us

def _iter_us_from_ns(self, parent):
for item in parent:
if item is None:
yield None
else:
if item % 1000 != 0:
self._warn("nanoseconds discarded", LossyConversionWarning)
yield item // 1000

def _primitive_iter(self, offset, length):
view = self._array_view
offset += view.offset
Expand All @@ -258,7 +396,7 @@ def _primitive_iter(self, offset, length):

class RowTupleIterator(PyIterator):
"""Iterate over rows of a struct array (stream) where each row is a
tuple instead of a dictionary. This is ~3x faster and matches other
tuple instead of a dictionary. This is usually faster and matches other
Python concepts more closely (e.g., dbapi's cursor, pandas itertuples).
Intended for internal use.
"""
Expand All @@ -278,6 +416,44 @@ def _iter1(self, offset, length):
return self._struct_tuple_iter(offset, length)


def _get_tzinfo(tz_string, strategy=None):
import re
from datetime import timedelta, timezone

# We can handle UTC without any imports
if re.search(r"^utc$", tz_string, re.IGNORECASE):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason you moved to this more complex check compared to tz_string.upper() == "UTC": ? Are there cases we would miss?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it back! It was a paper-thin theory that maybe it was less confusing to use re since it was used a few lines later, too.

return timezone.utc

# Arrow also allows fixed-offset in the from +HH:MM
maybe_fixed_offset = re.search(r"^([+-])([0-9]{2}):([0-9]{2})$", tz_string)
if maybe_fixed_offset:
sign, hours, minutes = maybe_fixed_offset.groups()
sign = 1 if sign == "+" else -1
return timezone(sign * timedelta(hours=int(hours), minutes=int(minutes)))

# Try zoneinfo.ZoneInfo() (Python 3.9+)
if strategy is None or "zoneinfo" in strategy:
try:
from zoneinfo import ZoneInfo

return ZoneInfo(tz_string)
except ImportError:
pass
jorisvandenbossche marked this conversation as resolved.
Show resolved Hide resolved

# Try dateutil.tz.gettz()
if strategy is None or "dateutil" in strategy:
try:
from dateutil.tz import gettz

return gettz(tz_string)
except ImportError:
pass

raise RuntimeError(
"zoneinfo (Python 3.9+) or dateutil is required to resolve timezone"
)


_ITEMS_ITER_LOOKUP = {
CArrowType.BINARY: "_binary_iter",
CArrowType.LARGE_BINARY: "_binary_iter",
Expand All @@ -288,6 +464,12 @@ def _iter1(self, offset, length):
CArrowType.LARGE_LIST: "_list_iter",
CArrowType.FIXED_SIZE_LIST: "_fixed_size_list_iter",
CArrowType.DICTIONARY: "_dictionary_iter",
CArrowType.DATE32: "_date_iter",
CArrowType.DATE64: "_date_iter",
CArrowType.TIME32: "_time_iter",
CArrowType.TIME64: "_time_iter",
CArrowType.TIMESTAMP: "_timestamp_iter",
CArrowType.DURATION: "_duration_iter",
}

_PRIMITIVE_TYPE_NAMES = [
Expand Down
Loading
Loading