From 18729f6ae500c4f2c52612f6224fe02647ce8ba8 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Fri, 22 Mar 2019 09:33:37 -0700 Subject: [PATCH 1/3] Use temporary file in load_table_from_dataframe This fixes a bug where `load_table_from_dataframe` could not be used with the `fastparquet` library. It should also use less memory when uploading large dataframes. --- bigquery/google/cloud/bigquery/client.py | 40 +++++++++++++++--------- bigquery/setup.py | 1 + bigquery/tests/unit/test_client.py | 14 +++------ 3 files changed, 31 insertions(+), 24 deletions(-) diff --git a/bigquery/google/cloud/bigquery/client.py b/bigquery/google/cloud/bigquery/client.py index 95d49e12968a..090e6ecc4397 100644 --- a/bigquery/google/cloud/bigquery/client.py +++ b/bigquery/google/cloud/bigquery/client.py @@ -24,6 +24,7 @@ import functools import gzip import os +import tempfile import uuid import six @@ -1123,11 +1124,10 @@ def load_table_from_dataframe( Raises: ImportError: - If a usable parquet engine cannot be found. This method - requires :mod:`pyarrow` to be installed. + If a usable parquet engine cannot be found. requires + :mod:`pyarrow` or :mod:`fastparquet` to be installed. """ - buffer = six.BytesIO() - dataframe.to_parquet(buffer) + job_id = _make_job_id(job_id, job_id_prefix) if job_config is None: job_config = job.LoadJobConfig() @@ -1136,17 +1136,27 @@ def load_table_from_dataframe( if location is None: location = self.location - return self.load_table_from_file( - buffer, - destination, - num_retries=num_retries, - rewind=True, - job_id=job_id, - job_id_prefix=job_id_prefix, - location=location, - project=project, - job_config=job_config, - ) + tmpfd, tmppath = tempfile.mkstemp(suffix="_job_{}.parquet".format(job_id[:8])) + os.close(tmpfd) + + try: + dataframe.to_parquet(tmppath) + + with open(tmppath, "rb") as parquet_file: + return self.load_table_from_file( + parquet_file, + destination, + num_retries=num_retries, + rewind=True, + job_id=job_id, + job_id_prefix=job_id_prefix, + location=location, + project=project, + job_config=job_config, + ) + + finally: + os.remove(tmppath) def _do_resumable_upload(self, stream, metadata, num_retries): """Perform a resumable upload. diff --git a/bigquery/setup.py b/bigquery/setup.py index 7cd901917e4c..6b4edaf561c0 100644 --- a/bigquery/setup.py +++ b/bigquery/setup.py @@ -39,6 +39,7 @@ # Exclude PyArrow dependency from Windows Python 2.7. 'pyarrow: platform_system != "Windows" or python_version >= "3.4"': 'pyarrow>=0.4.1', + 'fastparquet': ['fastparquet', 'python-snappy'], } diff --git a/bigquery/tests/unit/test_client.py b/bigquery/tests/unit/test_client.py index a98ee79aa116..794b76a0a9f4 100644 --- a/bigquery/tests/unit/test_client.py +++ b/bigquery/tests/unit/test_client.py @@ -4658,7 +4658,7 @@ def test_load_table_from_dataframe(self): self.TABLE_REF, num_retries=_DEFAULT_NUM_RETRIES, rewind=True, - job_id=None, + job_id=mock.ANY, job_id_prefix=None, location=None, project=None, @@ -4666,9 +4666,7 @@ def test_load_table_from_dataframe(self): ) sent_file = load_table_from_file.mock_calls[0][1][1] - sent_bytes = sent_file.getvalue() - assert isinstance(sent_bytes, bytes) - assert len(sent_bytes) > 0 + assert sent_file.closed sent_config = load_table_from_file.mock_calls[0][2]["job_config"] assert sent_config.source_format == job.SourceFormat.PARQUET @@ -4695,7 +4693,7 @@ def test_load_table_from_dataframe_w_client_location(self): self.TABLE_REF, num_retries=_DEFAULT_NUM_RETRIES, rewind=True, - job_id=None, + job_id=mock.ANY, job_id_prefix=None, location=self.LOCATION, project=None, @@ -4703,9 +4701,7 @@ def test_load_table_from_dataframe_w_client_location(self): ) sent_file = load_table_from_file.mock_calls[0][1][1] - sent_bytes = sent_file.getvalue() - assert isinstance(sent_bytes, bytes) - assert len(sent_bytes) > 0 + assert sent_file.closed sent_config = load_table_from_file.mock_calls[0][2]["job_config"] assert sent_config.source_format == job.SourceFormat.PARQUET @@ -4735,7 +4731,7 @@ def test_load_table_from_dataframe_w_custom_job_config(self): self.TABLE_REF, num_retries=_DEFAULT_NUM_RETRIES, rewind=True, - job_id=None, + job_id=mock.ANY, job_id_prefix=None, location=self.LOCATION, project=None, From 4bca63fc0d2521bf896e0b81257db760eb94f8b7 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Fri, 22 Mar 2019 14:08:31 -0700 Subject: [PATCH 2/3] Add tests using the fastparquet engine. --- bigquery/docs/snippets.py | 15 +++++++++++++-- bigquery/noxfile.py | 2 +- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/bigquery/docs/snippets.py b/bigquery/docs/snippets.py index 5356700a495a..00569c40af18 100644 --- a/bigquery/docs/snippets.py +++ b/bigquery/docs/snippets.py @@ -30,6 +30,10 @@ import pytest import six +try: + import fastparquet +except (ImportError, AttributeError): + fastparquet = None try: import pandas except (ImportError, AttributeError): @@ -3108,8 +3112,15 @@ def test_list_rows_as_dataframe(client): @pytest.mark.skipif(pandas is None, reason="Requires `pandas`") -@pytest.mark.skipif(pyarrow is None, reason="Requires `pyarrow`") -def test_load_table_from_dataframe(client, to_delete): +@pytest.mark.parametrize("parquet_engine", ["pyarrow", "fastparquet"]) +def test_load_table_from_dataframe(client, to_delete, parquet_engine): + if parquet_engine == "pyarrow" and pyarrow is None: + pytest.skip("Requires `pyarrow`") + if parquet_engine == "fastparquet" and fastparquet is None: + pytest.skip("Requires `fastparquet`") + + pandas.set_option("io.parquet.engine", parquet_engine) + dataset_id = "load_table_from_dataframe_{}".format(_millis()) dataset = bigquery.Dataset(client.dataset(dataset_id)) client.create_dataset(dataset) diff --git a/bigquery/noxfile.py b/bigquery/noxfile.py index 089a82375606..cd7db84e843a 100644 --- a/bigquery/noxfile.py +++ b/bigquery/noxfile.py @@ -123,7 +123,7 @@ def snippets(session): session.install('-e', local_dep) session.install('-e', os.path.join('..', 'storage')) session.install('-e', os.path.join('..', 'test_utils')) - session.install('-e', '.[pandas, pyarrow]') + session.install('-e', '.[pandas, pyarrow, fastparquet]') # Run py.test against the snippets tests. session.run( From 3f944769df446698001c748c2d23693e41a8012c Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Fri, 22 Mar 2019 14:11:24 -0700 Subject: [PATCH 3/3] Docs typo. --- bigquery/google/cloud/bigquery/client.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/bigquery/google/cloud/bigquery/client.py b/bigquery/google/cloud/bigquery/client.py index 090e6ecc4397..7fe412478bfa 100644 --- a/bigquery/google/cloud/bigquery/client.py +++ b/bigquery/google/cloud/bigquery/client.py @@ -1124,8 +1124,9 @@ def load_table_from_dataframe( Raises: ImportError: - If a usable parquet engine cannot be found. requires - :mod:`pyarrow` or :mod:`fastparquet` to be installed. + If a usable parquet engine cannot be found. This method + requires :mod:`pyarrow` or :mod:`fastparquet` to be + installed. """ job_id = _make_job_id(job_id, job_id_prefix)