Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

deps!: BigQuery Storage and pyarrow are required dependencies #776

Merged
merged 27 commits into from
Jul 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
ba49697
process: make BQ Storage and pyarrow required
plamut Jul 14, 2021
2c79152
Make pyarrow required in _pandas_helpers.py
plamut Jul 14, 2021
137ae92
Make pyarrow required in client.py
plamut Jul 15, 2021
42c9606
Make pyarrow required in table.py
plamut Jul 16, 2021
5ae720d
Make pyarrow required in job/query.py
plamut Jul 16, 2021
ede0313
Make pyarrow required in DBAPI tests
plamut Jul 16, 2021
ce7f93d
Make pyarrow required in snippets tests
plamut Jul 16, 2021
3f7c456
Make BQ storage required in client.py
plamut Jul 16, 2021
ed412c0
Make BQ storage required in table.py
plamut Jul 16, 2021
ef905c2
Make BQ storage required in DB API tests
plamut Jul 16, 2021
9c106f3
Make BQ storage required in magics.py
plamut Jul 16, 2021
5438239
Make BQ storage required in test__helpers.py
plamut Jul 16, 2021
cfcc2d3
Make BQ storage required in test__pandas_helpers.py
plamut Jul 16, 2021
86c4533
Make BQ storage required in test_query_pandas.py
plamut Jul 16, 2021
560ccd6
Make method signatures compatible again
plamut Jul 16, 2021
e658d5a
Remove checks for minimum BQ Storage version
plamut Jul 19, 2021
777b850
Merge branch 'master' into iss-757
plamut Jul 20, 2021
70572d8
Remove LegacyBigQueryStorageError
plamut Jul 20, 2021
be5cf32
Bump minimum pyarrow version to 3.0.0
plamut Jul 21, 2021
7960729
Remove unneeded pytest.importorskip for BQ Storage
plamut Jul 21, 2021
b5475d5
Merge branch 'master' into iss-757
plamut Jul 22, 2021
c0b810b
Remove pyarrow version checks in pandas helpers tests
plamut Jul 22, 2021
8d127cf
Conditionally skip pandas tests where needed
plamut Jul 22, 2021
e47f1bd
Remove unneeded conditional pyarrow version paths
plamut Jul 22, 2021
427665e
Cover schema autodetect failed code path in test
plamut Jul 22, 2021
820b092
Merge branch 'master' into iss-757
tswast Jul 27, 2021
c4315ce
fix bad merge
tswast Jul 27, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions docs/snippets.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@
import pandas
except (ImportError, AttributeError):
pandas = None
try:
import pyarrow
except (ImportError, AttributeError):
pyarrow = None

from google.api_core.exceptions import InternalServerError
from google.api_core.exceptions import ServiceUnavailable
Expand Down
3 changes: 0 additions & 3 deletions google/cloud/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
from google.cloud.bigquery.enums import KeyResultStatementKind
from google.cloud.bigquery.enums import SqlTypeNames
from google.cloud.bigquery.enums import StandardSqlDataTypes
from google.cloud.bigquery.exceptions import LegacyBigQueryStorageError
from google.cloud.bigquery.external_config import ExternalConfig
from google.cloud.bigquery.external_config import BigtableOptions
from google.cloud.bigquery.external_config import BigtableColumnFamily
Expand Down Expand Up @@ -171,8 +170,6 @@
"WriteDisposition",
# EncryptionConfiguration
"EncryptionConfiguration",
# Custom exceptions
"LegacyBigQueryStorageError",
]


Expand Down
26 changes: 0 additions & 26 deletions google/cloud/bigquery/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
from google.cloud._helpers import _to_bytes
import packaging.version

from google.cloud.bigquery.exceptions import LegacyBigQueryStorageError


_RFC3339_MICROS_NO_ZULU = "%Y-%m-%dT%H:%M:%S.%f"
_TIMEONLY_WO_MICROS = "%H:%M:%S"
Expand All @@ -41,7 +39,6 @@
re.VERBOSE,
)

_MIN_BQ_STORAGE_VERSION = packaging.version.Version("2.0.0")
_BQ_STORAGE_OPTIONAL_READ_SESSION_VERSION = packaging.version.Version("2.6.0")


Expand Down Expand Up @@ -75,29 +72,6 @@ def is_read_session_optional(self) -> bool:
"""
return self.installed_version >= _BQ_STORAGE_OPTIONAL_READ_SESSION_VERSION

def verify_version(self):
"""Verify that a recent enough version of BigQuery Storage extra is
installed.

The function assumes that google-cloud-bigquery-storage extra is
installed, and should thus be used in places where this assumption
holds.

Because `pip` can install an outdated version of this extra despite the
constraints in `setup.py`, the calling code can use this helper to
verify the version compatibility at runtime.

Raises:
LegacyBigQueryStorageError:
If the google-cloud-bigquery-storage package is outdated.
"""
if self.installed_version < _MIN_BQ_STORAGE_VERSION:
msg = (
"Dependency google-cloud-bigquery-storage is outdated, please upgrade "
f"it to version >= 2.0.0 (version found: {self.installed_version})."
)
raise LegacyBigQueryStorageError(msg)


BQ_STORAGE_VERSIONS = BQStorageVersions()

Expand Down
122 changes: 48 additions & 74 deletions google/cloud/bigquery/_pandas_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,13 @@
import queue
import warnings

from packaging import version

try:
import pandas
except ImportError: # pragma: NO COVER
pandas = None

try:
import pyarrow
import pyarrow.parquet
except ImportError: # pragma: NO COVER
pyarrow = None
import pyarrow
import pyarrow.parquet

try:
from google.cloud.bigquery_storage import ArrowSerializationOptions
Expand Down Expand Up @@ -106,63 +101,52 @@ def pyarrow_timestamp():
return pyarrow.timestamp("us", tz="UTC")


if pyarrow:
# This dictionary is duplicated in bigquery_storage/test/unite/test_reader.py
# When modifying it be sure to update it there as well.
BQ_TO_ARROW_SCALARS = {
"BOOL": pyarrow.bool_,
"BOOLEAN": pyarrow.bool_,
"BYTES": pyarrow.binary,
"DATE": pyarrow.date32,
"DATETIME": pyarrow_datetime,
"FLOAT": pyarrow.float64,
"FLOAT64": pyarrow.float64,
"GEOGRAPHY": pyarrow.string,
"INT64": pyarrow.int64,
"INTEGER": pyarrow.int64,
"NUMERIC": pyarrow_numeric,
"STRING": pyarrow.string,
"TIME": pyarrow_time,
"TIMESTAMP": pyarrow_timestamp,
}
ARROW_SCALAR_IDS_TO_BQ = {
# https://arrow.apache.org/docs/python/api/datatypes.html#type-classes
pyarrow.bool_().id: "BOOL",
pyarrow.int8().id: "INT64",
pyarrow.int16().id: "INT64",
pyarrow.int32().id: "INT64",
pyarrow.int64().id: "INT64",
pyarrow.uint8().id: "INT64",
pyarrow.uint16().id: "INT64",
pyarrow.uint32().id: "INT64",
pyarrow.uint64().id: "INT64",
pyarrow.float16().id: "FLOAT64",
pyarrow.float32().id: "FLOAT64",
pyarrow.float64().id: "FLOAT64",
pyarrow.time32("ms").id: "TIME",
pyarrow.time64("ns").id: "TIME",
pyarrow.timestamp("ns").id: "TIMESTAMP",
pyarrow.date32().id: "DATE",
pyarrow.date64().id: "DATETIME", # because millisecond resolution
pyarrow.binary().id: "BYTES",
pyarrow.string().id: "STRING", # also alias for pyarrow.utf8()
# The exact scale and precision don't matter, see below.
pyarrow.decimal128(38, scale=9).id: "NUMERIC",
}

if version.parse(pyarrow.__version__) >= version.parse("3.0.0"):
BQ_TO_ARROW_SCALARS["BIGNUMERIC"] = pyarrow_bignumeric
# The exact decimal's scale and precision are not important, as only
# the type ID matters, and it's the same for all decimal256 instances.
ARROW_SCALAR_IDS_TO_BQ[pyarrow.decimal256(76, scale=38).id] = "BIGNUMERIC"
_BIGNUMERIC_SUPPORT = True
else:
_BIGNUMERIC_SUPPORT = False

else: # pragma: NO COVER
BQ_TO_ARROW_SCALARS = {} # pragma: NO COVER
ARROW_SCALAR_IDS_TO_BQ = {} # pragma: NO_COVER
_BIGNUMERIC_SUPPORT = False # pragma: NO COVER
# This dictionary is duplicated in bigquery_storage/test/unite/test_reader.py
# When modifying it be sure to update it there as well.
BQ_TO_ARROW_SCALARS = {
"BIGNUMERIC": pyarrow_bignumeric,
"BOOL": pyarrow.bool_,
"BOOLEAN": pyarrow.bool_,
"BYTES": pyarrow.binary,
"DATE": pyarrow.date32,
"DATETIME": pyarrow_datetime,
"FLOAT": pyarrow.float64,
"FLOAT64": pyarrow.float64,
"GEOGRAPHY": pyarrow.string,
"INT64": pyarrow.int64,
"INTEGER": pyarrow.int64,
"NUMERIC": pyarrow_numeric,
"STRING": pyarrow.string,
"TIME": pyarrow_time,
"TIMESTAMP": pyarrow_timestamp,
}
ARROW_SCALAR_IDS_TO_BQ = {
# https://arrow.apache.org/docs/python/api/datatypes.html#type-classes
pyarrow.bool_().id: "BOOL",
pyarrow.int8().id: "INT64",
pyarrow.int16().id: "INT64",
pyarrow.int32().id: "INT64",
pyarrow.int64().id: "INT64",
pyarrow.uint8().id: "INT64",
pyarrow.uint16().id: "INT64",
pyarrow.uint32().id: "INT64",
pyarrow.uint64().id: "INT64",
pyarrow.float16().id: "FLOAT64",
pyarrow.float32().id: "FLOAT64",
pyarrow.float64().id: "FLOAT64",
pyarrow.time32("ms").id: "TIME",
pyarrow.time64("ns").id: "TIME",
pyarrow.timestamp("ns").id: "TIMESTAMP",
pyarrow.date32().id: "DATE",
pyarrow.date64().id: "DATETIME", # because millisecond resolution
pyarrow.binary().id: "BYTES",
pyarrow.string().id: "STRING", # also alias for pyarrow.utf8()
# The exact scale and precision don't matter, see below.
pyarrow.decimal128(38, scale=9).id: "NUMERIC",
# The exact decimal's scale and precision are not important, as only
# the type ID matters, and it's the same for all decimal256 instances.
pyarrow.decimal256(76, scale=38).id: "BIGNUMERIC",
}


def bq_to_arrow_struct_data_type(field):
Expand Down Expand Up @@ -346,13 +330,6 @@ def dataframe_to_bq_schema(dataframe, bq_schema):
# If schema detection was not successful for all columns, also try with
# pyarrow, if available.
if unknown_type_fields:
if not pyarrow:
msg = u"Could not determine the type of columns: {}".format(
", ".join(field.name for field in unknown_type_fields)
)
warnings.warn(msg)
return None # We cannot detect the schema in full.

# The augment_schema() helper itself will also issue unknown type
# warnings if detection still fails for any of the fields.
bq_schema_out = augment_schema(dataframe, bq_schema_out)
Expand Down Expand Up @@ -494,9 +471,6 @@ def dataframe_to_parquet(dataframe, bq_schema, filepath, parquet_compression="SN
serializing method. Defaults to "SNAPPY".
https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table
"""
if pyarrow is None:
raise ValueError("pyarrow is required for BigQuery schema conversion.")

bq_schema = schema._to_schema_fields(bq_schema)
arrow_table = dataframe_to_arrow(dataframe, bq_schema)
pyarrow.parquet.write_table(arrow_table, filepath, compression=parquet_compression)
Expand Down
67 changes: 7 additions & 60 deletions google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,11 @@
import json
import math
import os
import packaging.version
import tempfile
from typing import Any, BinaryIO, Dict, Iterable, Optional, Sequence, Tuple, Union
import uuid
import warnings

try:
import pyarrow

_PYARROW_VERSION = packaging.version.parse(pyarrow.__version__)
except ImportError: # pragma: NO COVER
pyarrow = None

from google import resumable_media # type: ignore
from google.resumable_media.requests import MultipartUpload
from google.resumable_media.requests import ResumableUpload
Expand All @@ -53,26 +45,21 @@
from google.cloud import exceptions # pytype: disable=import-error
from google.cloud.client import ClientWithProject # pytype: disable=import-error

try:
from google.cloud.bigquery_storage_v1.services.big_query_read.client import (
DEFAULT_CLIENT_INFO as DEFAULT_BQSTORAGE_CLIENT_INFO,
)
except ImportError:
DEFAULT_BQSTORAGE_CLIENT_INFO = None
from google.cloud.bigquery_storage_v1.services.big_query_read.client import (
DEFAULT_CLIENT_INFO as DEFAULT_BQSTORAGE_CLIENT_INFO,
)

from google.cloud.bigquery._helpers import _del_sub_prop
from google.cloud.bigquery._helpers import _get_sub_prop
from google.cloud.bigquery._helpers import _record_field_to_json
from google.cloud.bigquery._helpers import _str_or_none
from google.cloud.bigquery._helpers import BQ_STORAGE_VERSIONS
from google.cloud.bigquery._helpers import _verify_job_config_type
from google.cloud.bigquery._http import Connection
from google.cloud.bigquery import _pandas_helpers
from google.cloud.bigquery.dataset import Dataset
from google.cloud.bigquery.dataset import DatasetListItem
from google.cloud.bigquery.dataset import DatasetReference
from google.cloud.bigquery.enums import AutoRowIDs
from google.cloud.bigquery.exceptions import LegacyBigQueryStorageError
from google.cloud.bigquery.opentelemetry_tracing import create_span
from google.cloud.bigquery import job
from google.cloud.bigquery.job import (
Expand Down Expand Up @@ -121,9 +108,6 @@
# https://github.com/googleapis/python-bigquery/issues/438
_MIN_GET_QUERY_RESULTS_TIMEOUT = 120

# https://github.com/googleapis/python-bigquery/issues/781#issuecomment-883497414
_PYARROW_BAD_VERSIONS = frozenset([packaging.version.Version("2.0.0")])


class Project(object):
"""Wrapper for resource describing a BigQuery project.
Expand Down Expand Up @@ -483,17 +467,10 @@ def _ensure_bqstorage_client(
) -> Optional["google.cloud.bigquery_storage.BigQueryReadClient"]:
"""Create a BigQuery Storage API client using this client's credentials.

If a client cannot be created due to a missing or outdated dependency
`google-cloud-bigquery-storage`, raise a warning and return ``None``.

If the `bqstorage_client` argument is not ``None``, still perform the version
check and return the argument back to the caller if the check passes. If it
fails, raise a warning and return ``None``.

Args:
bqstorage_client:
An existing BigQuery Storage client instance to check for version
compatibility. If ``None``, a new instance is created and returned.
An existing BigQuery Storage client instance. If ``None``, a new
instance is created and returned.
client_options:
Custom options used with a new BigQuery Storage client instance if one
is created.
Expand All @@ -504,20 +481,7 @@ def _ensure_bqstorage_client(
Returns:
A BigQuery Storage API client.
"""
try:
from google.cloud import bigquery_storage
except ImportError:
warnings.warn(
"Cannot create BigQuery Storage client, the dependency "
"google-cloud-bigquery-storage is not installed."
)
return None

try:
BQ_STORAGE_VERSIONS.verify_version()
except LegacyBigQueryStorageError as exc:
warnings.warn(str(exc))
return None
from google.cloud import bigquery_storage

if bqstorage_client is None:
bqstorage_client = bigquery_storage.BigQueryReadClient(
Expand Down Expand Up @@ -2496,7 +2460,7 @@ def load_table_from_dataframe(
:attr:`~google.cloud.bigquery.job.LoadJobConfig.schema` with
column names matching those of the dataframe. The BigQuery
schema is used to determine the correct data type conversion.
Indexes are not loaded. Requires the :mod:`pyarrow` library.
Indexes are not loaded.

By default, this method uses the parquet source format. To
override this, supply a value for
Expand Down Expand Up @@ -2526,9 +2490,6 @@ def load_table_from_dataframe(
google.cloud.bigquery.job.LoadJob: A new load job.

Raises:
ValueError:
If a usable parquet engine cannot be found. This method
requires :mod:`pyarrow` to be installed.
TypeError:
If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.LoadJobConfig`
class.
Expand Down Expand Up @@ -2556,10 +2517,6 @@ def load_table_from_dataframe(
)
)

if pyarrow is None and job_config.source_format == job.SourceFormat.PARQUET:
# pyarrow is now the only supported parquet engine.
raise ValueError("This method requires pyarrow to be installed")

if location is None:
location = self.location

Expand Down Expand Up @@ -2615,16 +2572,6 @@ def load_table_from_dataframe(
try:

if job_config.source_format == job.SourceFormat.PARQUET:
if _PYARROW_VERSION in _PYARROW_BAD_VERSIONS:
msg = (
"Loading dataframe data in PARQUET format with pyarrow "
f"{_PYARROW_VERSION} can result in data corruption. It is "
"therefore *strongly* advised to use a different pyarrow "
"version or a different source format. "
"See: https://github.com/googleapis/python-bigquery/issues/781"
)
warnings.warn(msg, category=RuntimeWarning)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This warning was removed because the just-fixed bug will not come into play once we require pyarrow > 3.0.0.


if job_config.schema:
if parquet_compression == "snappy": # adjust the default value
parquet_compression = parquet_compression.upper()
Expand Down
Loading