Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: consolidate PyarrowVersions helpers #1679

Merged
merged 16 commits into from
Oct 18, 2023
79 changes: 4 additions & 75 deletions google/cloud/bigquery/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import math
import re
import os
from typing import Any, Optional, Union
from typing import Optional, Union

from dateutil import relativedelta
from google.cloud._helpers import UTC # type: ignore
Expand All @@ -32,10 +32,7 @@

import packaging.version

from google.cloud.bigquery.exceptions import (
LegacyBigQueryStorageError,
LegacyPyarrowError,
)
from google.cloud.bigquery import exceptions

_RFC3339_MICROS_NO_ZULU = "%Y-%m-%dT%H:%M:%S.%f"
_TIMEONLY_WO_MICROS = "%H:%M:%S"
Expand All @@ -57,8 +54,6 @@

_MIN_BQ_STORAGE_VERSION = packaging.version.Version("2.0.0")

_MIN_PYARROW_VERSION = packaging.version.Version("3.0.0")

_BQ_STORAGE_OPTIONAL_READ_SESSION_VERSION = packaging.version.Version("2.6.0")

BIGQUERY_EMULATOR_HOST = "BIGQUERY_EMULATOR_HOST"
Expand Down Expand Up @@ -115,84 +110,18 @@ def verify_version(self):
verify the version compatibility at runtime.

Raises:
LegacyBigQueryStorageError:
exceptions.LegacyBigQueryStorageError:
If the google-cloud-bigquery-storage package is outdated.
"""
if self.installed_version < _MIN_BQ_STORAGE_VERSION:
msg = (
"Dependency google-cloud-bigquery-storage is outdated, please upgrade "
f"it to version >= {_MIN_BQ_STORAGE_VERSION} (version found: {self.installed_version})."
)
raise LegacyBigQueryStorageError(msg)


class PyarrowVersions:
"""Version comparisons for pyarrow package."""

def __init__(self):
self._installed_version = None

@property
def installed_version(self) -> packaging.version.Version:
"""Return the parsed version of pyarrow."""
if self._installed_version is None:
import pyarrow # type: ignore

self._installed_version = packaging.version.parse(
# Use 0.0.0, since it is earlier than any released version.
# Legacy versions also have the same property, but
# creating a LegacyVersion has been deprecated.
# https://github.com/pypa/packaging/issues/321
getattr(pyarrow, "__version__", "0.0.0")
)

return self._installed_version

@property
def use_compliant_nested_type(self) -> bool:
return self.installed_version.major >= 4

def try_import(self, raise_if_error: bool = False) -> Any:
"""Verify that a recent enough version of pyarrow extra is
installed.

The function assumes that pyarrow extra is installed, and should thus
be used in places where this assumption holds.

Because `pip` can install an outdated version of this extra despite the
constraints in `setup.py`, the calling code can use this helper to
verify the version compatibility at runtime.

Returns:
The ``pyarrow`` module or ``None``.

Raises:
LegacyPyarrowError:
If the pyarrow package is outdated and ``raise_if_error`` is ``True``.
"""
try:
import pyarrow
except ImportError as exc: # pragma: NO COVER
if raise_if_error:
raise LegacyPyarrowError(
f"pyarrow package not found. Install pyarrow version >= {_MIN_PYARROW_VERSION}."
) from exc
return None

if self.installed_version < _MIN_PYARROW_VERSION:
if raise_if_error:
msg = (
"Dependency pyarrow is outdated, please upgrade "
f"it to version >= {_MIN_PYARROW_VERSION} (version found: {self.installed_version})."
)
raise LegacyPyarrowError(msg)
return None

return pyarrow
raise exceptions.LegacyBigQueryStorageError(msg)


BQ_STORAGE_VERSIONS = BQStorageVersions()
PYARROW_VERSIONS = PyarrowVersions()


def _not_null(value, field):
Expand Down
103 changes: 14 additions & 89 deletions google/cloud/bigquery/_pandas_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import warnings
from typing import Any, Union

from packaging import version

from google.cloud.bigquery import _helpers
from google.cloud.bigquery import _pyarrow_helpers
from google.cloud.bigquery import _versions_helpers
from google.cloud.bigquery import schema

try:
Expand All @@ -49,7 +49,11 @@
db_dtypes_import_exception = exc
date_dtype_name = time_dtype_name = "" # Use '' rather than None because pytype

pyarrow = _helpers.PYARROW_VERSIONS.try_import()
pyarrow = _versions_helpers.PYARROW_VERSIONS.try_import()

_BIGNUMERIC_SUPPORT = False
if pyarrow is not None:
_BIGNUMERIC_SUPPORT = True

try:
# _BaseGeometry is used to detect shapely objevys in `bq_to_arrow_array`
Expand Down Expand Up @@ -119,87 +123,6 @@ def __init__(self):
self.done = False


def pyarrow_datetime():
return pyarrow.timestamp("us", tz=None)


def pyarrow_numeric():
return pyarrow.decimal128(38, 9)


def pyarrow_bignumeric():
# 77th digit is partial.
# https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#decimal_types
return pyarrow.decimal256(76, 38)


def pyarrow_time():
return pyarrow.time64("us")


def pyarrow_timestamp():
return pyarrow.timestamp("us", tz="UTC")


if pyarrow:
# This dictionary is duplicated in bigquery_storage/test/unite/test_reader.py
# When modifying it be sure to update it there as well.
BQ_TO_ARROW_SCALARS = {
"BOOL": pyarrow.bool_,
"BOOLEAN": pyarrow.bool_,
"BYTES": pyarrow.binary,
"DATE": pyarrow.date32,
"DATETIME": pyarrow_datetime,
"FLOAT": pyarrow.float64,
"FLOAT64": pyarrow.float64,
"GEOGRAPHY": pyarrow.string,
"INT64": pyarrow.int64,
"INTEGER": pyarrow.int64,
"NUMERIC": pyarrow_numeric,
"STRING": pyarrow.string,
"TIME": pyarrow_time,
"TIMESTAMP": pyarrow_timestamp,
}
ARROW_SCALAR_IDS_TO_BQ = {
# https://arrow.apache.org/docs/python/api/datatypes.html#type-classes
pyarrow.bool_().id: "BOOL",
pyarrow.int8().id: "INT64",
pyarrow.int16().id: "INT64",
pyarrow.int32().id: "INT64",
pyarrow.int64().id: "INT64",
pyarrow.uint8().id: "INT64",
pyarrow.uint16().id: "INT64",
pyarrow.uint32().id: "INT64",
pyarrow.uint64().id: "INT64",
pyarrow.float16().id: "FLOAT64",
pyarrow.float32().id: "FLOAT64",
pyarrow.float64().id: "FLOAT64",
pyarrow.time32("ms").id: "TIME",
pyarrow.time64("ns").id: "TIME",
pyarrow.timestamp("ns").id: "TIMESTAMP",
pyarrow.date32().id: "DATE",
pyarrow.date64().id: "DATETIME", # because millisecond resolution
pyarrow.binary().id: "BYTES",
pyarrow.string().id: "STRING", # also alias for pyarrow.utf8()
# The exact scale and precision don't matter, see below.
pyarrow.decimal128(38, scale=9).id: "NUMERIC",
}

if version.parse(pyarrow.__version__) >= version.parse("3.0.0"):
BQ_TO_ARROW_SCALARS["BIGNUMERIC"] = pyarrow_bignumeric
# The exact decimal's scale and precision are not important, as only
# the type ID matters, and it's the same for all decimal256 instances.
ARROW_SCALAR_IDS_TO_BQ[pyarrow.decimal256(76, scale=38).id] = "BIGNUMERIC"
_BIGNUMERIC_SUPPORT = True
else:
_BIGNUMERIC_SUPPORT = False # pragma: NO COVER

else: # pragma: NO COVER
BQ_TO_ARROW_SCALARS = {} # pragma: NO COVER
ARROW_SCALAR_IDS_TO_BQ = {} # pragma: NO_COVER
_BIGNUMERIC_SUPPORT = False # pragma: NO COVER


BQ_FIELD_TYPE_TO_ARROW_FIELD_METADATA = {
"GEOGRAPHY": {
b"ARROW:extension:name": b"google:sqlType:geography",
Expand Down Expand Up @@ -240,7 +163,7 @@ def bq_to_arrow_data_type(field):
if field_type_upper in schema._STRUCT_TYPES:
Linchin marked this conversation as resolved.
Show resolved Hide resolved
return bq_to_arrow_struct_data_type(field)

data_type_constructor = BQ_TO_ARROW_SCALARS.get(field_type_upper)
data_type_constructor = _pyarrow_helpers.bq_to_arrow_scalars(field_type_upper)
if data_type_constructor is None:
return None
return data_type_constructor()
Expand Down Expand Up @@ -568,7 +491,9 @@ def augment_schema(dataframe, current_bq_schema):
if pyarrow.types.is_list(arrow_table.type):
# `pyarrow.ListType`
detected_mode = "REPEATED"
detected_type = ARROW_SCALAR_IDS_TO_BQ.get(arrow_table.values.type.id)
detected_type = _pyarrow_helpers.arrow_scalar_ids_to_bq(
arrow_table.values.type.id
)

# For timezone-naive datetimes, pyarrow assumes the UTC timezone and adds
# it to such datetimes, causing them to be recognized as TIMESTAMP type.
Expand All @@ -584,7 +509,7 @@ def augment_schema(dataframe, current_bq_schema):
detected_type = "DATETIME"
else:
detected_mode = field.mode
detected_type = ARROW_SCALAR_IDS_TO_BQ.get(arrow_table.type.id)
detected_type = _pyarrow_helpers.arrow_scalar_ids_to_bq(arrow_table.type.id)

if detected_type is None:
unknown_type_fields.append(field)
Expand Down Expand Up @@ -705,13 +630,13 @@ def dataframe_to_parquet(

This argument is ignored for ``pyarrow`` versions earlier than ``4.0.0``.
"""
pyarrow = _helpers.PYARROW_VERSIONS.try_import(raise_if_error=True)
pyarrow = _versions_helpers.PYARROW_VERSIONS.try_import(raise_if_error=True)

import pyarrow.parquet # type: ignore

kwargs = (
{"use_compliant_nested_type": parquet_use_compliant_nested_type}
if _helpers.PYARROW_VERSIONS.use_compliant_nested_type
if _versions_helpers.PYARROW_VERSIONS.use_compliant_nested_type
else {}
)

Expand Down
123 changes: 123 additions & 0 deletions google/cloud/bigquery/_pyarrow_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Shared helper functions for connecting BigQuery and pyarrow."""

from typing import Any

from packaging import version

try:
import pyarrow # type: ignore
except ImportError: # pragma: NO COVER
pyarrow = None


def pyarrow_datetime():
return pyarrow.timestamp("us", tz=None)


def pyarrow_numeric():
return pyarrow.decimal128(38, 9)


def pyarrow_bignumeric():
# 77th digit is partial.
# https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#decimal_types
return pyarrow.decimal256(76, 38)


def pyarrow_time():
return pyarrow.time64("us")


def pyarrow_timestamp():
return pyarrow.timestamp("us", tz="UTC")


_BQ_TO_ARROW_SCALARS = {}
_ARROW_SCALAR_IDS_TO_BQ = {}

if pyarrow:
# This dictionary is duplicated in bigquery_storage/test/unite/test_reader.py
# When modifying it be sure to update it there as well.
# Note(todo!!): type "BIGNUMERIC"'s matching pyarrow type is added in _pandas_helpers.py
_BQ_TO_ARROW_SCALARS = {
"BOOL": pyarrow.bool_,
"BOOLEAN": pyarrow.bool_,
"BYTES": pyarrow.binary,
"DATE": pyarrow.date32,
"DATETIME": pyarrow_datetime,
"FLOAT": pyarrow.float64,
"FLOAT64": pyarrow.float64,
"GEOGRAPHY": pyarrow.string,
"INT64": pyarrow.int64,
"INTEGER": pyarrow.int64,
"NUMERIC": pyarrow_numeric,
"STRING": pyarrow.string,
"TIME": pyarrow_time,
"TIMESTAMP": pyarrow_timestamp,
}

_ARROW_SCALAR_IDS_TO_BQ = {
# https://arrow.apache.org/docs/python/api/datatypes.html#type-classes
pyarrow.bool_().id: "BOOL",
pyarrow.int8().id: "INT64",
pyarrow.int16().id: "INT64",
pyarrow.int32().id: "INT64",
pyarrow.int64().id: "INT64",
pyarrow.uint8().id: "INT64",
pyarrow.uint16().id: "INT64",
pyarrow.uint32().id: "INT64",
pyarrow.uint64().id: "INT64",
pyarrow.float16().id: "FLOAT64",
pyarrow.float32().id: "FLOAT64",
pyarrow.float64().id: "FLOAT64",
pyarrow.time32("ms").id: "TIME",
pyarrow.time64("ns").id: "TIME",
pyarrow.timestamp("ns").id: "TIMESTAMP",
pyarrow.date32().id: "DATE",
pyarrow.date64().id: "DATETIME", # because millisecond resolution
pyarrow.binary().id: "BYTES",
pyarrow.string().id: "STRING", # also alias for pyarrow.utf8()
# The exact scale and precision don't matter, see below.
pyarrow.decimal128(38, scale=9).id: "NUMERIC",
tswast marked this conversation as resolved.
Show resolved Hide resolved
}

# Adds bignumeric support only if pyarrow version >= 3.0.0
# Decimal256 support was added to arrow 3.0.0
# https://arrow.apache.org/blog/2021/01/25/3.0.0-release/
if version.parse(pyarrow.__version__) >= version.parse("3.0.0"):
_BQ_TO_ARROW_SCALARS["BIGNUMERIC"] = pyarrow_bignumeric
# The exact decimal's scale and precision are not important, as only
# the type ID matters, and it's the same for all decimal256 instances.
_ARROW_SCALAR_IDS_TO_BQ[pyarrow.decimal256(76, scale=38).id] = "BIGNUMERIC"


def bq_to_arrow_scalars(bq_scalar: str):
"""
Returns:
The Arrow scalar type that the input BigQuery scalar type maps to.
If it cannot find the BigQuery scalar, return None.
"""
return _BQ_TO_ARROW_SCALARS.get(bq_scalar)


def arrow_scalar_ids_to_bq(arrow_scalar: Any):
"""
Returns:
The BigQuery scalar type that the input arrow scalar type maps to.
If it cannot find the arrow scalar, return None.
"""
return _ARROW_SCALAR_IDS_TO_BQ.get(arrow_scalar)
Loading