diff --git a/ci/requirements-2.6.txt b/ci/requirements-2.6.txt index d101ab9d6876f..117d14005e175 100644 --- a/ci/requirements-2.6.txt +++ b/ci/requirements-2.6.txt @@ -4,7 +4,6 @@ python-dateutil==1.5 pytz==2013b http://www.crummy.com/software/BeautifulSoup/bs4/download/4.2/beautifulsoup4-4.2.0.tar.gz html5lib==1.0b2 -bigquery==2.0.17 numexpr==1.4.2 sqlalchemy==0.7.1 pymysql==0.6.0 diff --git a/ci/requirements-2.7.txt b/ci/requirements-2.7.txt index 2e0e20b047ee0..fccc7a0ffdea7 100644 --- a/ci/requirements-2.7.txt +++ b/ci/requirements-2.7.txt @@ -19,5 +19,7 @@ lxml==3.2.1 scipy==0.13.3 beautifulsoup4==4.2.1 statsmodels==0.5.0 -bigquery==2.0.17 boto==2.26.1 +httplib2==0.8 +python-gflags==2.0 +google-api-python-client==1.2 \ No newline at end of file diff --git a/doc/source/install.rst b/doc/source/install.rst index 56ab7b70407bc..fe56b53d7cb82 100644 --- a/doc/source/install.rst +++ b/doc/source/install.rst @@ -112,7 +112,9 @@ Optional Dependencies :func:`~pandas.io.clipboard.read_clipboard`. Most package managers on Linux distributions will have xclip and/or xsel immediately available for installation. - * `Google bq Command Line Tool `__ + * Google's `python-gflags` and `google-api-python-client` + * Needed for :mod:`~pandas.io.gbq` + * `httplib2` * Needed for :mod:`~pandas.io.gbq` * One of the following combinations of libraries is needed to use the top-level :func:`~pandas.io.html.read_html` function: diff --git a/doc/source/io.rst b/doc/source/io.rst index bc58b04de4473..129d74ac92df1 100644 --- a/doc/source/io.rst +++ b/doc/source/io.rst @@ -3373,83 +3373,79 @@ Google BigQuery (Experimental) The :mod:`pandas.io.gbq` module provides a wrapper for Google's BigQuery analytics web service to simplify retrieving results from BigQuery tables using SQL-like queries. Result sets are parsed into a pandas -DataFrame with a shape derived from the source table. Additionally, -DataFrames can be uploaded into BigQuery datasets as tables -if the source datatypes are compatible with BigQuery ones. +DataFrame with a shape and data types derived from the source table. +Additionally, DataFrames can be appended to existing BigQuery tables if +the destination table is the same shape as the DataFrame. For specifics on the service itself, see `here `__ -As an example, suppose you want to load all data from an existing table -: `test_dataset.test_table` -into BigQuery and pull it into a DataFrame. +As an example, suppose you want to load all data from an existing BigQuery +table : `test_dataset.test_table` into a DataFrame using the :func:`~pandas.io.read_gbq` +function. .. code-block:: python - - from pandas.io import gbq - # Insert your BigQuery Project ID Here - # Can be found in the web console, or - # using the command line tool `bq ls` + # Can be found in the Google web console projectid = "xxxxxxxx" - data_frame = gbq.read_gbq('SELECT * FROM test_dataset.test_table', project_id = projectid) + data_frame = pd.read_gbq('SELECT * FROM test_dataset.test_table', project_id = projectid) -The user will then be authenticated by the `bq` command line client - -this usually involves the default browser opening to a login page, -though the process can be done entirely from command line if necessary. -Datasets and additional parameters can be either configured with `bq`, -passed in as options to `read_gbq`, or set using Google's gflags (this -is not officially supported by this module, though care was taken -to ensure that they should be followed regardless of how you call the -method). +You will then be authenticated to the specified BigQuery account +via Google's Oauth2 mechanism. In general, this is as simple as following the +prompts in a browser window which will be opened for you. Should the browser not +be available, or fail to launch, a code will be provided to complete the process +manually. Additional information on the authentication mechanism can be found +`here `__ -Additionally, you can define which column to use as an index as well as a preferred column order as follows: +You can define which column from BigQuery to use as an index in the +destination DataFrame as well as a preferred column order as follows: .. code-block:: python - data_frame = gbq.read_gbq('SELECT * FROM test_dataset.test_table', + data_frame = pd.read_gbq('SELECT * FROM test_dataset.test_table', index_col='index_column_name', - col_order='[col1, col2, col3,...]', project_id = projectid) - -Finally, if you would like to create a BigQuery table, `my_dataset.my_table`, from the rows of DataFrame, `df`: + col_order=['col1', 'col2', 'col3'], project_id = projectid) + +Finally, you can append data to a BigQuery table from a pandas DataFrame +using the :func:`~pandas.io.to_gbq` function. This function uses the +Google streaming API which requires that your destination table exists in +BigQuery. Given the BigQuery table already exists, your DataFrame should +match the destination table in column order, structure, and data types. +DataFrame indexes are not supported. By default, rows are streamed to +BigQuery in chunks of 10,000 rows, but you can pass other chuck values +via the ``chunksize`` argument. You can also see the progess of your +post via the ``verbose`` flag which defaults to ``True``. The http +response code of Google BigQuery can be successful (200) even if the +append failed. For this reason, if there is a failure to append to the +table, the complete error response from BigQuery is returned which +can be quite long given it provides a status for each row. You may want +to start with smaller chuncks to test that the size and types of your +dataframe match your destination table to make debugging simpler. .. code-block:: python df = pandas.DataFrame({'string_col_name' : ['hello'], 'integer_col_name' : [1], 'boolean_col_name' : [True]}) - schema = ['STRING', 'INTEGER', 'BOOLEAN'] - data_frame = gbq.to_gbq(df, 'my_dataset.my_table', - if_exists='fail', schema = schema, project_id = projectid) - -To add more rows to this, simply: - -.. code-block:: python - - df2 = pandas.DataFrame({'string_col_name' : ['hello2'], - 'integer_col_name' : [2], - 'boolean_col_name' : [False]}) - data_frame = gbq.to_gbq(df2, 'my_dataset.my_table', if_exists='append', project_id = projectid) + df.to_gbq('my_dataset.my_table', project_id = projectid) -.. note:: +The BigQuery SQL query language has some oddities, see `here `__ - A default project id can be set using the command line: - `bq init`. +While BigQuery uses SQL-like syntax, it has some important differences +from traditional databases both in functionality, API limitations (size and +qunatity of queries or uploads), and how Google charges for use of the service. +You should refer to Google documentation often as the service seems to +be changing and evolving. BiqQuery is best for analyzing large sets of +data quickly, but it is not a direct replacement for a transactional database. - There is a hard cap on BigQuery result sets, at 128MB compressed. Also, the BigQuery SQL query language has some oddities, - see `here `__ - - You can access the management console to determine project id's by: - +You can access the management console to determine project id's by: + .. warning:: - To use this module, you will need a BigQuery account. See - for details. - - As of 1/28/14, a known bug is present that could possibly cause data duplication in the resultant dataframe. A fix is imminent, - but any client changes will not make it into 0.13.1. See: - http://stackoverflow.com/questions/20984592/bigquery-results-not-including-page-token/21009144?noredirect=1#comment32090677_21009144 + To use this module, you will need a valid BigQuery account. See + for details on the + service. .. _io.stata: diff --git a/doc/source/v0.14.1.txt b/doc/source/v0.14.1.txt index 45a5d55ca047d..d5b1c96ab4e8b 100644 --- a/doc/source/v0.14.1.txt +++ b/doc/source/v0.14.1.txt @@ -154,14 +154,11 @@ Performance Experimental ~~~~~~~~~~~~ -``pandas.io.data.Options`` has gained a ``get_all_data method``, and now consistently returns a multi-indexed ``DataFrame`` (:issue:`5602`). See :ref:`the docs` - - .. ipython:: python - - from pandas.io.data import Options - aapl = Options('aapl', 'yahoo') - data = aapl.get_all_data() - data.iloc[0:5, 0:5] +- ``io.gbq.read_gbq`` and ``io.gbq.to_gbq`` were refactored to remove the + dependency on the Google ``bq.py`` command line client. This submodule + now uses ``httplib2`` and the Google ``apiclient`` and ``oauth2client`` API client + libraries which should be more stable and, therefore, reliable than + ``bq.py`` (:issue:`6937`). .. _whatsnew_0141.bug_fixes: diff --git a/pandas/core/frame.py b/pandas/core/frame.py index b4e69e2056507..dd1d87dfa468e 100644 --- a/pandas/core/frame.py +++ b/pandas/core/frame.py @@ -669,47 +669,43 @@ def to_dict(self, outtype='dict'): else: # pragma: no cover raise ValueError("outtype %s not understood" % outtype) - def to_gbq(self, destination_table, schema=None, col_order=None, - if_exists='fail', **kwargs): + def to_gbq(self, destination_table, project_id=None, chunksize=10000, + verbose=True, reauth=False): """Write a DataFrame to a Google BigQuery table. - If the table exists, the DataFrame will be appended. If not, a new - table will be created, in which case the schema will have to be - specified. By default, rows will be written in the order they appear - in the DataFrame, though the user may specify an alternative order. + THIS IS AN EXPERIMENTAL LIBRARY + + If the table exists, the dataframe will be written to the table using + the defined table schema and column types. For simplicity, this method + uses the Google BigQuery streaming API. The to_gbq method chunks data + into a default chunk size of 10,000. Failures return the complete error + response which can be quite long depending on the size of the insert. + There are several important limitations of the Google streaming API + which are detailed at: + https://developers.google.com/bigquery/streaming-data-into-bigquery. Parameters - --------------- + ---------- + dataframe : DataFrame + DataFrame to be written destination_table : string - name of table to be written, in the form 'dataset.tablename' - schema : sequence (optional) - list of column types in order for data to be inserted, e.g. - ['INTEGER', 'TIMESTAMP', 'BOOLEAN'] - col_order : sequence (optional) - order which columns are to be inserted, e.g. ['primary_key', - 'birthday', 'username'] - if_exists : {'fail', 'replace', 'append'} (optional) - - fail: If table exists, do nothing. - - replace: If table exists, drop it, recreate it, and insert data. - - append: If table exists, insert data. Create if does not exist. - kwargs are passed to the Client constructor - - Raises - ------ - SchemaMissing : - Raised if the 'if_exists' parameter is set to 'replace', but no - schema is specified - TableExists : - Raised if the specified 'destination_table' exists but the - 'if_exists' parameter is set to 'fail' (the default) - InvalidSchema : - Raised if the 'schema' parameter does not match the provided - DataFrame + Name of table to be written, in the form 'dataset.tablename' + project_id : str + Google BigQuery Account project ID. + chunksize : int (default 10000) + Number of rows to be inserted in each chunk from the dataframe. + verbose : boolean (default True) + Show percentage complete + reauth : boolean (default False) + Force Google BigQuery to reauthenticate the user. This is useful + if multiple accounts are used. + """ from pandas.io import gbq - return gbq.to_gbq(self, destination_table, schema=None, col_order=None, - if_exists='fail', **kwargs) + return gbq.to_gbq(self, destination_table, project_id=project_id, + chunksize=chunksize, verbose=verbose, + reauth=reauth) @classmethod def from_records(cls, data, index=None, exclude=None, columns=None, diff --git a/pandas/io/gbq.py b/pandas/io/gbq.py index 60381a2a628c2..76848a62d0d5f 100644 --- a/pandas/io/gbq.py +++ b/pandas/io/gbq.py @@ -1,68 +1,104 @@ -""" -Pandas module to interface with Google BigQuery. -""" -import os -import sys -import tempfile -import csv -import logging from datetime import datetime -import pkg_resources -from distutils.version import LooseVersion -from pandas.compat import u +import json +import logging +import sys +from time import sleep +import uuid -import pandas as pd import numpy as np +import pkg_resources -from pandas.core.common import PandasError -from pandas.core.frame import DataFrame +from distutils.version import LooseVersion +from pandas import compat +from pandas.core.api import DataFrame from pandas.tools.merge import concat +from pandas.core.common import PandasError -try: - import bq - import bigquery_client - import gflags as flags - _BQ_INSTALLED = True - _BQ_VERSION = pkg_resources.get_distribution('bigquery').version - if LooseVersion(_BQ_VERSION) >= '2.0.17': - _BQ_VALID_VERSION = True - else: - _BQ_VALID_VERSION = False +_GOOGLE_API_CLIENT_INSTALLED = False +_GOOGLE_API_CLIENT_VALID_VERSION = False +_GOOGLE_FLAGS_INSTALLED = False +_GOOGLE_FLAGS_VALID_VERSION = False +_HTTPLIB2_INSTALLED = False -except ImportError: - _BQ_INSTALLED = False +if not compat.PY3: + + try: + from apiclient.discovery import build + from apiclient.http import MediaFileUpload + from apiclient.errors import HttpError + from oauth2client.client import OAuth2WebServerFlow + from oauth2client.client import AccessTokenRefreshError + from oauth2client.client import flow_from_clientsecrets + from oauth2client.file import Storage + from oauth2client.tools import run + _GOOGLE_API_CLIENT_INSTALLED=True + _GOOGLE_API_CLIENT_VERSION = pkg_resources.get_distribution('google-api-python-client').version -# Setup the logger -logger = logging.getLogger('pandas.io.gbq') + if LooseVersion(_GOOGLE_API_CLIENT_VERSION >= '1.2.0'): + _GOOGLE_API_CLIENT_VALID_VERSION = True + + except ImportError: + _GOOGLE_API_CLIENT_INSTALLED = False + + + try: + import gflags as flags + _GOOGLE_FLAGS_INSTALLED = True -# These are some custom exceptions that the -# to_gbq() method can throw + _GOOGLE_FLAGS_VERSION = pkg_resources.get_distribution('python-gflags').version + if LooseVersion(_GOOGLE_FLAGS_VERSION >= '2.0.0'): + _GOOGLE_FLAGS_VALID_VERSION = True -class SchemaMissing(PandasError, IOError): + except ImportError: + _GOOGLE_FLAGS_INSTALLED = False + + try: + import httplib2 + _HTTPLIB2_INSTALLED = True + except ImportError: + _HTTPLIB2_INSTALLED = False + + +logger = logging.getLogger('pandas.io.gbq') +logger.setLevel(logging.ERROR) + +class InvalidPageToken(PandasError, IOError): + """ + Raised when Google BigQuery fails to return, + or returns a duplicate page token. + """ + pass + +class InvalidQueryException(PandasError, IOError): """ - Raised when attempting to write a DataFrame to - a new table in Google BigQuery without specifying - a schema describing the DataFrame. + Raised when a malformed query is given to read_gbq. """ pass +class AccessDeniedException(PandasError, IOError): + """ + Raised when invalid credentials are provided, or tokens have expired. + """ + pass -class InvalidSchema(PandasError, IOError): +class NotFoundException(PandasError, IOError): """ - Raised when attempting to write a DataFrame to - Google BigQuery with an invalid table schema. + Raised when the project_id/table provided in the query could not be found. """ pass +class TermsOfServiceNotAcceptedException(PandasError, IOError): + """ + Raised when the terms of service were not accepted or have been unaccepted. + """ + pass -class TableExistsFail(PandasError, IOError): +class UnknownGBQException(PandasError, IOError): """ - Raised when attempting to write a DataFrame to - an existing Google BigQuery table without specifying - that a replace/update action be taken. + Raised when an unrecognized Google API Error occurs. """ pass @@ -75,253 +111,263 @@ class InvalidColumnOrder(PandasError, IOError): """ pass +class GbqConnector: + def __init__(self, project_id, reauth=False): + self.project_id = project_id + self.reauth = reauth + self.credentials = self.get_credentials() + self.service = self.get_service(self.credentials) + + def get_credentials(self): + flow = OAuth2WebServerFlow(client_id='495642085510-k0tmvj2m941jhre2nbqka17vqpjfddtd.apps.googleusercontent.com', + client_secret='kOc9wMptUtxkcIFbtZCcrEAc', + scope='https://www.googleapis.com/auth/bigquery', + redirect_uri='urn:ietf:wg:oauth:2.0:oob') + + storage = Storage('bigquery_credentials.dat') + credentials = storage.get() + + if credentials is None or credentials.invalid or self.reauth: + credentials = run(flow, storage) + + return credentials + + def get_service(self, credentials): + http = httplib2.Http() + http = credentials.authorize(http) + bigquery_service = build('bigquery', 'v2', http=http) + + return bigquery_service + + def run_query(self, query): + job_collection = self.service.jobs() + job_data = { + 'configuration': { + 'query': { + 'query': query + #'allowLargeResults', 'createDisposition', 'preserveNulls', destinationTable, useQueryCache + } + } + } + + try: + query_reply = job_collection.insert(projectId=self.project_id, + body=job_data).execute() + status = query_reply['status'] + except AccessTokenRefreshError: + raise AccessDeniedException("The credentials have been revoked or expired, please re-run" + "the application to re-authorize") + except HttpError as ex: + status = json.loads(ex.content)['error'] + + + errors = status.get('errors', None) + + if errors: + reasons = [error['reason'] for error in errors] + if 'accessDenied' in reasons: + raise AccessDeniedException + if 'invalidQuery' in reasons: + raise InvalidQueryException + if 'notFound' in reasons: + raise NotFoundException + if 'termsOfServiceNotAccepted' in reasons: + raise TermsOfServiceNotAcceptedException + else: + raise UnknownGBQException(errors) + + job_reference = query_reply['jobReference'] + + while(not 'jobComplete' in query_reply): + print('Job not yet complete...') + query_reply = job_collection.getQueryResults( + projectId=job_reference['projectId'], + jobId=job_reference['jobId']).execute() + + total_rows = int(query_reply['totalRows']) + result_pages = list() + seen_page_tokens = list() + current_row = 0 + #Only read schema on first page + schema = query_reply['schema'] + + # Loop through each page of data + while('rows' in query_reply and current_row < total_rows): + page = query_reply['rows'] + result_pages.append(page) + current_row += len(page) + page_token = query_reply.get('pageToken', None) + + if not page_token and current_row < total_rows: + raise InvalidPageToken("Required pageToken was missing. Recieved {0} of {1} rows".format(current_row,total_rows)) + + elif page_token in seen_page_tokens: + raise InvalidPageToken("A duplicate pageToken was returned") + + seen_page_tokens.append(page_token) + query_reply = job_collection.getQueryResults( + projectId = job_reference['projectId'], + jobId = job_reference['jobId'], + pageToken = page_token).execute() + + if (current_row < total_rows): + raise InvalidPageToken() + + return schema, result_pages + + def load_data(self, dataframe, dataset_id, table_id, chunksize, verbose): + job_id = uuid.uuid4().hex + rows = [] + remaining_rows = len(dataframe) + + if verbose: + total_rows = remaining_rows + sys.stdout.write("\n\n") + sys.stdout.flush() + + for index, row in dataframe.reset_index(drop=True).iterrows(): + row_dict = dict() + row_dict['json'] = json.loads(row.to_json(force_ascii = False, + date_unit = 's', + date_format = 'iso')) + row_dict['insertId'] = job_id + str(index) + rows.append(row_dict) + remaining_rows -= 1 + + if (len(rows) % chunksize == 0) or (remaining_rows == 0): + if verbose: + sys.stdout.write("\rStreaming Insert is {0}% Complete".format(((total_rows - remaining_rows) * 100) / total_rows)) + sys.stdout.flush() + + body = {'rows': rows} + response = self.service.tabledata().insertAll( + projectId = self.project_id, + datasetId = dataset_id, + tableId = table_id, + body = body).execute() + if 'insertErrors' in response: + raise UnknownGBQException(response) + + sleep(1) # Maintains the inserts "per second" rate per API + rows = [] + + if verbose: + sys.stdout.write("\n") + sys.stdout.flush() + +def _parse_data(schema, rows): + # see: http://pandas.pydata.org/pandas-docs/dev/missing_data.html#missing-data-casting-rules-and-indexing + dtype_map = {'INTEGER': np.dtype(float), + 'FLOAT': np.dtype(float), + 'TIMESTAMP': 'M8[ns]'} # This seems to be buggy without + # nanosecond indicator -def _authenticate(): - """ - For testing, we abstract the authentication to BigQuery API. - Presently this is implemented using the bq.py Client.Get() - method. Any exceptions raised are considered fatal, so we - do not process them. + fields = schema['fields'] + col_types = [field['type'] for field in fields] + col_names = [field['name'].encode('ascii', 'ignore') for field in fields] + col_dtypes = [dtype_map.get(field['type'], object) for field in fields] + page_array = np.zeros((len(rows),), + dtype=zip(col_names, col_dtypes)) - Returns - ------- - BigqueryClient : Configured connection to Google BigQuery - """ - return bq.Client.Get() + for row_num, raw_row in enumerate(rows): + entries = raw_row.get('f', []) + for col_num, field_type in enumerate(col_types): + field_value = _parse_entry(entries[col_num].get('v', ''), + field_type) + page_array[row_num][col_num] = field_value + return DataFrame(page_array) def _parse_entry(field_value, field_type): - """ - Given a value and the corresponding BigQuery data type, - perform any operations needed and return in a format - appropriate for a numpy record dictionary - - Parameters - ---------- - field_value : Source object to be transformed - field_type : String representation of Google BigQuery - data type (per schema) - - Returns - ------- - field_value : object or primitive of type corresponding - to field_type - """ - - # Avoid any casting problems if field_value is None or field_value == 'null': return None if field_type == 'INTEGER' or field_type == 'FLOAT': - field_value = float(field_value) + return float(field_value) elif field_type == 'TIMESTAMP': timestamp = datetime.utcfromtimestamp(float(field_value)) - field_value = np.datetime64(timestamp) + return np.datetime64(timestamp) elif field_type == 'BOOLEAN': - field_value = field_value == 'true' - elif field_type == 'STRING': - field_value = field_value - else: - field_value = str(field_value) + return field_value == 'true' return field_value +def _test_imports(): + _GOOGLE_API_CLIENT_INSTALLED + _GOOGLE_API_CLIENT_VALID_VERSION + _GOOGLE_FLAGS_INSTALLED + _GOOGLE_FLAGS_VALID_VERSION + _HTTPLIB2_INSTALLED -def _parse_page(raw_page, col_names, col_types, col_dtypes): - """ - Given a list of rows produced by the client.apiclient.tabledata().list(), - build a numpy array with proper dtypes and column names as specified - by the arguments. + if compat.PY3: + raise NotImplementedError("Google's libraries do not support Python 3 yet") - Parameters - ---------- - raw_page : Resulting list of rows from a page retrieved via - bigquery API - client.apiclient.tabledata().list().execute()['rows'] - col_names: An ordered list of names for the columns - col_types: String representation of the BigQuery DataType for that - column - col_dtypes: Target numpy.dtype for the column + if not _GOOGLE_API_CLIENT_INSTALLED: + raise ImportError('Could not import Google API Client.') - Returns - ------- - page_array : numpy record array corresponding - to the page data - """ + if not _GOOGLE_FLAGS_INSTALLED: + raise ImportError('Could not import Google Command Line Flags Module.') - # Should be at most 100,000 per the API, but this could - # be increased in the future. Should only be less than - # this for the last page to reduce API calls - page_row_count = len(raw_page) + if not _GOOGLE_API_CLIENT_VALID_VERSION: + raise ImportError("pandas requires google-api-python-client >= 1.2.0 for Google " + "BigQuery support, current version " + _GOOGLE_API_CLIENT_VERSION) - # Place to hold the results for a page of data - page_array = np.zeros((page_row_count,), dtype=zip(col_names, col_dtypes)) - for row_num, raw_row in enumerate(raw_page): - entries = raw_row.get('f', []) - # Iterate over each entry - setting proper field types - for col_num, field_type in enumerate(col_types): - # Process the field's types using schema - field_value = _parse_entry(entries[col_num].get('v', ''), - field_type) - # Fill the value into the final array - page_array[row_num][col_num] = field_value + if not _GOOGLE_FLAGS_VALID_VERSION: + raise ImportError("pandas requires python-gflags >= 2.0.0 for Google " + "BigQuery support, current version " + _GOOGLE_FLAGS_VERSION) - return page_array + if not _HTTPLIB2_INSTALLED: + raise ImportError("pandas requires httplib2 for Google BigQuery support") +def read_gbq(query, project_id = None, index_col=None, col_order=None, reauth=False): + """Load data from Google BigQuery. -def _parse_data(client, job, index_col=None, col_order=None): - """ - Iterate through the query results and piece together the - final DataFrame. Builds a DataFrame for each page of - results, then concatenates them together when finished. - To save memory, we use numpy record arrays to build these - DataFrames. + THIS IS AN EXPERIMENTAL LIBRARY + + The main method a user calls to execute a Query in Google BigQuery and read results + into a pandas DataFrame using the v2 Google API client for Python. Documentation for + the API is available at https://developers.google.com/api-client-library/python/. + Authentication to the Google BigQuery service is via OAuth 2.0 using the product name + 'pandas GBQ'. Parameters ---------- - client: An instance of bq.Client - job: An array containing the job info for a completed query - index_col: str (optional) + query : str + SQL-Like Query to return data values + project_id : str + Google BigQuery Account project ID. + index_col : str (optional) Name of result column to use for index in results DataFrame - col_order: list() (optional) + col_order : list(str) (optional) List of BigQuery column names in the desired order for results DataFrame + reauth : boolean (default False) + Force Google BigQuery to reauthenticate the user. This is useful + if multiple accounts are used. Returns ------- - df: pandas DataFrame + df: DataFrame DataFrame representing results of query - Raises: - ------ - InvalidColumnOrder: - Raised if 'col_order' parameter doesn't match returned DataFrame - BigqueryError: - Raised by bigquery_client if a Google API error is encountered - - - Notes: - ----- - This script relies on Google being consistent with their - pagination API. We are using the most flexible iteration method - that we could find in the bq.py/bigquery_client.py API's, but - these have undergone large amounts of change recently. """ - # dtype Map - - # see: http://pandas.pydata.org/pandas-docs/dev/missing_data.html#missing-data-casting-rules-and-indexing - dtype_map = {'INTEGER': np.dtype(float), - 'FLOAT': np.dtype(float), - 'TIMESTAMP': 'M8[ns]'} # This seems to be buggy without - # nanosecond indicator - - # We first need the schema to get information about the columns of - # our dataframe. - - table_dict = job['configuration']['query']['destinationTable'] - fields = client.GetTableSchema(table_dict)['fields'] - - # Get the schema into a format useable to create our - # dataframe - col_dtypes = [] - col_types = [] - col_names = [] - - # TODO: Do this in one clean step - for field in fields: - col_types.append(field['type']) - # Note the encoding... numpy doesn't like titles that are UTF8, which - # is the return type from the API - col_names.append(field['name'].encode('ascii', 'ignore')) - # Note, it would be nice to use 'str' types, but BigQuery doesn't have - # a fixed length in mind - just maxes out at 64k - col_dtypes.append(dtype_map.get(field['type'], object)) - - # How many columns are there - num_columns = len(col_names) - - # Iterate over the result rows. - # Since Google's API now requires pagination of results, - # we do that here. The following is repurposed from - # bigquery_client.py :: Client._JobTableReader._ReadOnePage - - # TODO: Enable Reading From Table, - # see Client._TableTableReader._ReadOnePage - - # Initially, no page token is set - page_token = None - - # This number is the current max results per page - max_rows = bigquery_client._MAX_ROWS_PER_REQUEST - - # How many rows in result set? Initialize to max_rows - total_rows = max_rows - - # This is the starting row for a particular page... - # is ignored if page_token is present, though - # it may be useful if we wish to implement SQL like LIMITs - # with minimums - start_row = 0 - - # Keep our page DataFrames until the end when we concatenate them - dataframe_list = list() - - current_job = job['jobReference'] - - # Iterate over all rows - while start_row < total_rows: - # Setup the parameters for getQueryResults() API Call - kwds = dict(current_job) - kwds['maxResults'] = max_rows - # Sets the timeout to 0 because we assume the table is already ready. - # This is because our previous call to Query() is synchronous - # and will block until it's actually done - kwds['timeoutMs'] = 0 - # Use start row if there's no page_token ... in other words, the - # user requested to start somewhere other than the beginning... - # presently this is not a parameter to read_gbq(), but it will be - # added eventually. - if page_token: - kwds['pageToken'] = page_token - else: - kwds['startIndex'] = start_row - data = client.apiclient.jobs().getQueryResults(**kwds).execute() - if not data['jobComplete']: - raise bigquery_client.BigqueryError('Job was not completed, or was invalid') - - # How many rows are there across all pages? - # Note: This is presently the only reason we don't just use - # _ReadOnePage() directly - total_rows = int(data['totalRows']) - - page_token = data.get('pageToken', None) - raw_page = data.get('rows', []) - page_array = _parse_page(raw_page, col_names, col_types, col_dtypes) - - start_row += len(raw_page) - if total_rows > 0: - completed = (100 * start_row) / total_rows - logger.info('Remaining Rows: ' + str(total_rows - start_row) + '(' - + str(completed) + '% Complete)') - else: - logger.info('No Rows') + _test_imports() - dataframe_list.append(DataFrame(page_array)) + if not project_id: + raise TypeError("Missing required parameter: project_id") - # Did we get enough rows? Note: gbq.py stopped checking for this - # but we felt it was still a good idea. - if not page_token and not raw_page and start_row != total_rows: - raise bigquery_client.BigqueryInterfaceError( - 'Not enough rows returned by server. Expected: {0} Rows, But ' - 'Received {1}'.format(total_rows, start_row) - ) + connector = GbqConnector(project_id, reauth = reauth) + schema, pages = connector.run_query(query) + dataframe_list = [] + while len(pages) > 0: + page = pages.pop() + dataframe_list.append(_parse_data(schema, page)) - # Build final dataframe - final_df = concat(dataframe_list, ignore_index=True) + final_df = concat(dataframe_list, ignore_index = True) # Reindex the DataFrame on the provided column if index_col is not None: - if index_col in col_names: - final_df.set_index(index_col, inplace=True) - col_names.remove(index_col) + if index_col in final_df.columns: + final_df.set_index(index_col, inplace = True) else: raise InvalidColumnOrder( 'Index column "{0}" does not exist in DataFrame.' @@ -330,7 +376,7 @@ def _parse_data(client, job, index_col=None, col_order=None): # Change the order of columns in the DataFrame based on provided list if col_order is not None: - if sorted(col_order) == sorted(col_names): + if sorted(col_order) == sorted(final_df.columns): final_df = final_df[col_order] else: raise InvalidColumnOrder( @@ -343,188 +389,47 @@ def _parse_data(client, job, index_col=None, col_order=None): final_df._data = final_df._data.downcast(dtypes='infer') return final_df - -def to_gbq(dataframe, destination_table, schema=None, col_order=None, - if_exists='fail', **kwargs): +def to_gbq(dataframe, destination_table, project_id=None, chunksize=10000, + verbose=True, reauth=False): """Write a DataFrame to a Google BigQuery table. THIS IS AN EXPERIMENTAL LIBRARY - If the table exists, the DataFrame will be appended. If not, a new table - will be created, in which case the schema will have to be specified. By - default, rows will be written in the order they appear in the DataFrame, - though the user may specify an alternative order. + If the table exists, the dataframe will be written to the table using + the defined table schema and column types. For simplicity, this method + uses the Google BigQuery streaming API. The to_gbq method chunks data + into a default chunk size of 10,000. Failures return the complete error + response which can be quite long depending on the size of the insert. + There are several important limitations of the Google streaming API + which are detailed at: + https://developers.google.com/bigquery/streaming-data-into-bigquery. Parameters ---------- dataframe : DataFrame DataFrame to be written destination_table : string - name of table to be written, in the form 'dataset.tablename' - schema : sequence (optional) - list of column types in order for data to be inserted, - e.g. ['INTEGER', 'TIMESTAMP', 'BOOLEAN'] - col_order : sequence (optional) - order which columns are to be inserted, - e.g. ['primary_key', 'birthday', 'username'] - if_exists : {'fail', 'replace', 'append'} (optional) - - fail: If table exists, do nothing. - - replace: If table exists, drop it, recreate it, and insert data. - - append: If table exists, insert data. Create if does not exist. - kwargs are passed to the Client constructor - - Raises - ------ - SchemaMissing : - Raised if the 'if_exists' parameter is set to 'replace', but no schema - is specified - TableExists : - Raised if the specified 'destination_table' exists but the 'if_exists' - parameter is set to 'fail' (the default) - InvalidSchema : - Raised if the 'schema' parameter does not match the provided DataFrame - """ - - if not _BQ_INSTALLED: - if sys.version_info >= (3, 0): - raise NotImplementedError('gbq module does not support Python 3 ' - 'yet') - else: - raise ImportError('Could not import Google BigQuery Client.') - - if not _BQ_VALID_VERSION: - raise ImportError("pandas requires bigquery >= 2.0.17 for Google " - "BigQuery support, current version " + _BQ_VERSION) - - ALLOWED_TYPES = ['STRING', 'INTEGER', 'FLOAT', 'BOOLEAN', 'TIMESTAMP', - 'RECORD'] - - if if_exists == 'replace' and schema is None: - raise SchemaMissing('Cannot replace a table without specifying the ' - 'data schema') - else: - client = _authenticate() - table_reference = client.GetTableReference(destination_table) - if client.TableExists(table_reference): - if if_exists == 'fail': - raise TableExistsFail('Cannot overwrite existing tables if ' - '\'if_exists="fail"\'') - else: - # Build up a string representation of the - # table's schema. Since the table already - # exists, we ask ask the API for it, which - # is returned in a list of dictionaries - # describing column data. Iterate over these - # and build up a string of form: - # "col_name1 : col_type1, col_name2 : col_type2..." - schema_full = client.GetTableSchema( - dict(table_reference) - )['fields'] - schema = '' - for count, row in enumerate(schema_full): - if count > 0: - schema += ', ' - schema += row['name'] + ':' + row['type'] - else: - logger.info('Creating New Table') - if schema is None: - raise SchemaMissing('Cannot create a new table without ' - 'specifying the data schema') - else: - columns = dataframe.columns - if len(schema) != len(columns): - raise InvalidSchema('Incorrect number of columns in ' - 'schema') - else: - schema_string = '' - for count, name in enumerate(columns): - if count > 0: - schema_string += ', ' - column_type = schema[count].upper() - if column_type in ALLOWED_TYPES: - schema_string += name + ':' + schema[count].lower() - else: - raise InvalidSchema('Invalid Type: ' + column_type - + ". Must be one of: " + - str(ALLOWED_TYPES)) - schema = schema_string - - opts = kwargs - opts['sync'] = True - opts['skip_leading_rows'] = 1 - opts['encoding'] = 'UTF-8' - opts['max_bad_records'] = 0 - - # See: https://developers.google.com/bigquery/docs/reference/v2/jobs - if if_exists == 'replace': - opts['write_disposition'] = 'WRITE_TRUNCATE' - elif if_exists == 'append': - opts['write_disposition'] = 'WRITE_APPEND' - - with tempfile.NamedTemporaryFile() as csv_file: - dataframe.to_csv(csv_file.name, index=False, encoding='utf-8') - job = client.Load(table_reference, csv_file.name, schema=schema, - **opts) - - -def read_gbq(query, project_id=None, destination_table=None, index_col=None, - col_order=None, **kwargs): - """Load data from Google BigQuery. - - THIS IS AN EXPERIMENTAL LIBRARY - - The main method a user calls to load data from Google BigQuery into a - pandas DataFrame. This is a simple wrapper for Google's bq.py and - bigquery_client.py, which we use to get the source data. Because of this, - this script respects the user's bq settings file, '~/.bigqueryrc', if it - exists. Such a file can be generated using 'bq init'. Further, additional - parameters for the query can be specified as either ``**kwds`` in the - command, or using FLAGS provided in the 'gflags' module. Particular options - can be found in bigquery_client.py. - - Parameters - ---------- - query : str - SQL-Like Query to return data values - project_id : str (optional) - Google BigQuery Account project ID. Optional, since it may be - located in ~/.bigqueryrc - index_col : str (optional) - Name of result column to use for index in results DataFrame - col_order : list(str) (optional) - List of BigQuery column names in the desired order for results - DataFrame - destination_table : string (optional) - If provided, send the results to the given table. - **kwargs : - To be passed to bq.Client.Create(). Particularly: 'trace', - 'sync', 'api', 'api_version' - - Returns - ------- - df: DataFrame - DataFrame representing results of query + Name of table to be written, in the form 'dataset.tablename' + project_id : str + Google BigQuery Account project ID. + chunksize : int (default 10000) + Number of rows to be inserted in each chunk from the dataframe. + verbose : boolean (default True) + Show percentage complete + reauth : boolean (default False) + Force Google BigQuery to reauthenticate the user. This is useful + if multiple accounts are used. """ - if not _BQ_INSTALLED: - if sys.version_info >= (3, 0): - raise NotImplementedError('gbq module does not support Python 3 ' - 'yet') - else: - raise ImportError('Could not import Google BigQuery Client.') - - if not _BQ_VALID_VERSION: - raise ImportError('pandas requires bigquery >= 2.0.17 for Google ' - 'BigQuery support, current version ' + _BQ_VERSION) + _test_imports() - query_args = kwargs - query_args['project_id'] = project_id - query_args['query'] = query - query_args['destination_table'] = destination_table - query_args['sync'] = True + if not project_id: + raise TypeError("Missing required parameter: project_id") - client = _authenticate() + if not '.' in destination_table: + raise NotFoundException("Invalid Table Name. Should be of the form 'datasetId.tableId' ") - job = client.Query(**query_args) + connector = GbqConnector(project_id, reauth = reauth) + dataset_id, table_id = destination_table.rsplit('.',1) - return _parse_data(client, job, index_col=index_col, col_order=col_order) + connector.load_data(dataframe, dataset_id, table_id, chunksize, verbose) diff --git a/pandas/io/tests/test_gbq.py b/pandas/io/tests/test_gbq.py index 124658ac80234..0f595f75bc66f 100644 --- a/pandas/io/tests/test_gbq.py +++ b/pandas/io/tests/test_gbq.py @@ -1,474 +1,290 @@ import ast +import datetime +import json import nose import os +import pytz import shutil import subprocess +import sys +import platform +from time import sleep import numpy as np +from pandas import NaT +from pandas.compat import u +from pandas.core.frame import DataFrame import pandas.io.gbq as gbq import pandas.util.testing as tm -from pandas.core.frame import DataFrame -from pandas.util.testing import with_connectivity_check -from pandas.compat import u -from pandas import NaT +PROJECT_ID = None +VERSION = platform.python_version() -try: - import bq - import bigquery_client - import gflags as flags -except ImportError: - raise nose.SkipTest - -#################################################################################### -# Fake Google BigQuery Client - -class FakeClient: - def __init__(self): - self.apiclient = FakeApiClient() - def GetTableSchema(self,table_dict): - retval = {'fields': [ - {'type': 'STRING', 'name': 'corpus', 'mode': 'NULLABLE'}, - {'type': 'INTEGER', 'name': 'corpus_date', 'mode': 'NULLABLE'}, - {'type': 'STRING', 'name': 'word', 'mode': 'NULLABLE'}, - {'type': 'INTEGER', 'name': 'word_count', 'mode': 'NULLABLE'} - ]} - return retval - -# Fake Google BigQuery API Client -class FakeApiClient: - def __init__(self): - self._fakejobs = FakeJobs() - - - def jobs(self): - return self._fakejobs - -class FakeJobs: - def __init__(self): - self._fakequeryresults = FakeResults() - - def getQueryResults(self, job_id=None, project_id=None, - max_results=None, timeout_ms=None, **kwargs): - return self._fakequeryresults - -class FakeResults: - def execute(self): - return {'rows': [ {'f': [{'v': 'othello'}, {'v': '1603'}, {'v': 'brave'}, {'v': '3'}]}, - {'f': [{'v': 'othello'}, {'v': '1603'}, {'v': 'attended'}, {'v': '1'}]}, - {'f': [{'v': 'othello'}, {'v': '1603'}, {'v': 'treason'}, {'v': '1'}]}, - {'f': [{'v': 'othello'}, {'v': '1603'}, {'v': 'islanders'}, {'v': '1'}]}, - {'f': [{'v': 'othello'}, {'v': '1603'}, {'v': 'heed'}, {'v': '3'}]}, - {'f': [{'v': 'othello'}, {'v': '1603'}, {'v': 'alehouse'}, {'v': '1'}]}, - {'f': [{'v': 'othello'}, {'v': '1603'}, {'v': 'corrigible'}, {'v': '1'}]}, - {'f': [{'v': 'othello'}, {'v': '1603'}, {'v': 'brawl'}, {'v': '2'}]}, - {'f': [{'v': 'othello'}, {'v': '1603'}, {'v': "'"}, {'v': '17'}]}, - {'f': [{'v': 'othello'}, {'v': '1603'}, {'v': 'troubled'}, {'v': '1'}]} - ], - 'kind': 'bigquery#tableDataList', - 'etag': '"4PTsVxg68bQkQs1RJ1Ndewqkgg4/hoRHzb4qfhJAIa2mEewC-jhs9Bg"', - 'totalRows': '10', - 'jobComplete' : True} - -#################################################################################### - -class TestGbq(tm.TestCase): +def missing_bq(): + try: + subprocess.call('bq') + return False + except OSError: + return True + +def test_requirements(): + try: + gbq._test_imports() + except (ImportError, NotImplementedError) as import_exception: + raise nose.SkipTest(import_exception) + +class TestGBQConnectorIntegration(tm.TestCase): def setUp(self): - with open(self.fake_job_path, 'r') as fin: - self.fake_job = ast.literal_eval(fin.read()) - - self.test_data_small = [{'f': [{'v': 'othello'}, {'v': '1603'}, {'v': 'brave'}, {'v': '3'}]}, - {'f': [{'v': 'othello'}, {'v': '1603'}, {'v': 'attended'}, {'v': '1'}]}, - {'f': [{'v': 'othello'}, {'v': '1603'}, {'v': 'treason'}, {'v': '1'}]}, - {'f': [{'v': 'othello'}, {'v': '1603'}, {'v': 'islanders'}, {'v': '1'}]}, - {'f': [{'v': 'othello'}, {'v': '1603'}, {'v': 'heed'}, {'v': '3'}]}, - {'f': [{'v': 'othello'}, {'v': '1603'}, {'v': 'alehouse'}, {'v': '1'}]}, - {'f': [{'v': 'othello'}, {'v': '1603'}, {'v': 'corrigible'}, {'v': '1'}]}, - {'f': [{'v': 'othello'}, {'v': '1603'}, {'v': 'brawl'}, {'v': '2'}]}, - {'f': [{'v': 'othello'}, {'v': '1603'}, {'v': "'"}, {'v': '17'}]}, - {'f': [{'v': 'othello'}, {'v': '1603'}, {'v': 'troubled'}, - {'v': '1'}]}] - - self.correct_data_small = np.array( - [('othello', 1603, 'brave', 3), - ('othello', 1603, 'attended', 1), - ('othello', 1603, 'treason', 1), - ('othello', 1603, 'islanders', 1), - ('othello', 1603, 'heed', 3), - ('othello', 1603, 'alehouse', 1), - ('othello', 1603, 'corrigible', 1), - ('othello', 1603, 'brawl', 2), - ('othello', 1603, "'", 17), - ('othello', 1603, 'troubled', 1) - ], - dtype=[('corpus', 'S16'), - ('corpus_date', '