Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Schema conflict when storing dataframes with datetime objects using load_table_from_dataframe() #6542

Closed
tatome opened this issue Nov 16, 2018 · 15 comments
Assignees
Labels
api: bigquery Issues related to the BigQuery API. type: question Request for information or clarification. Not an issue.

Comments

@tatome
Copy link

tatome commented Nov 16, 2018

The output

Traceback (most recent call last):
  File "/tmp/mwe.py", line 14, in <module>
    print(job.result())
  File "/home/tatome/anaconda3/envs/default_env/lib/python3.6/site-packages/google/cloud/bigquery/job.py", line 688, in result
    return super(_AsyncJob, self).result(timeout=timeout)
  File "/home/tatome/anaconda3/envs/default_env/lib/python3.6/site-packages/google/api_core/future/polling.py", line 120, in result
    raise self._exception
google.api_core.exceptions.BadRequest: 400 Provided Schema does not match Table [project+dataset].test. Field test_column has changed type from DATETIME to TIMESTAMP.

is produced by the following code:

import pandas as pd
from google.cloud.bigquery import Client
client = Client(project='[project]')
table_name = 'test'
job = client.query(f'CREATE TABLE [dataset].{table_name} ( idx INT64, test_column DATETIME )')
job.result()
df = pd.DataFrame({'idx' : [0,1], 'test_column' : ['2018-12-24', '2019-01-01']})
df['test_column'] = pd.to_datetime(df['test_column']).values.astype('datetime64[ms]')
dataset_ref = client.dataset('[dataset]')
table_ref = dataset_ref.table(table_name)
df = df.set_index('idx')
job = client.load_table_from_dataframe(df, table_ref, location='EU')
print(job.result())
@tseaver tseaver added type: question Request for information or clarification. Not an issue. api: bigquery Issues related to the BigQuery API. labels Nov 16, 2018
@tseaver
Copy link
Contributor

tseaver commented Dec 7, 2018

@tatome I'm not reproducing this one:

>>> import pandas as pd
>>> df = pd.DataFrame({'idx': [0, 1], 'test_column': ['2018-12-24', '2019-01-01']})
>>> df
   idx test_column
0    0  2018-12-24
1    1  2019-01-01
>>> pd.to_datetime(df['test_column'])
0   2018-12-24
1   2019-01-01
Name: test_column, dtype: datetime64[ns]
>>> df['test_column'] = pd.to_datetime(df['test_column']).values.astype('datetime64[ms]')
>>> df
   idx test_column
0    0  2018-12-24
1    1  2019-01-01
>>> df['test_column']
0   2018-12-24
1   2019-01-01
Name: test_column, dtype: datetime64[ns]
>>> df['test_column'][0]
Timestamp('2018-12-24 00:00:00')
>>> type(df['test_column'][0])
<class 'pandas._libs.tslibs.timestamps.Timestamp'>
>>> df = df.set_index('idx')
>>> from google.cloud import bigquery
>>> client = bigquery.Client()
>>> ds = client.create_dataset('gcp_6542')
>>> tref = ds.table('test')
>>> job = client.load_table_from_dataframe(df, tref, location='US')
>>> print(job.result())
<google.cloud.bigquery.job.LoadJob object at 0x7f72a7adc6d8>
>>> job.done()
True

Can you suggest something I missed?

@tseaver tseaver added status: investigating The issue is under investigation, which is determined to be non-trivial. status: awaiting information and removed status: investigating The issue is under investigation, which is determined to be non-trivial. labels Dec 7, 2018
@tiagobevilaqua
Copy link

tiagobevilaqua commented Mar 12, 2019

@tseaver I think that you missed setting the schema first.

Because in this way (I guess) you are using the 'autodetect' schema from BigQuery.

I'm facing the same problem when trying to load data from a dataframe on a previously determined schema.

df.info()
<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 29032 entries, 1993-07-01 to 2018-10-01
Data columns (total 3 columns):
subcategory    29032 non-null object
state_code     29032 non-null object
services       29032 non-null object
dtypes: object(3)
memory usage: 1.5+ MB

Pre-defining schema:

schema = [
    bq.SchemaField('date', 'DATETIME', mode='REQUIRED'),
    bq.SchemaField('subcategory', 'STRING', mode='REQUIRED'),
    bq.SchemaField('state_code', 'GEOGRAPHY', mode='REQUIRED'),
    bq.SchemaField('services', 'INTEGER', mode='REQUIRED')
    ]
table = bq.Table(table_ref, schema=schema)

table.schema
[SchemaField('date', 'DATETIME', 'REQUIRED', None, ()), #DF index
 SchemaField('subcategory', 'STRING', 'REQUIRED', None, ()),
 SchemaField('state_code', 'GEOGRAPHY', 'REQUIRED', None, ()),
 SchemaField('services', 'INTEGER', 'REQUIRED', None, ())]

Error when it runs:

client.load_table_from_dataframe(df, table_ref).result()

BadRequest                                Traceback (most recent call last)
<ipython-input-122-97f4f20faa6a> in <module>()
----> 1 job.result()

/usr/local/lib/python3.6/dist-packages/google/cloud/bigquery/job.py in result(self, timeout, retry)
    701             self._begin(retry=retry)
    702         # TODO: modify PollingFuture so it can pass a retry argument to done().
--> 703         return super(_AsyncJob, self).result(timeout=timeout)
    704 
    705     def cancelled(self):

/usr/local/lib/python3.6/dist-packages/google/api_core/future/polling.py in result(self, timeout)
    125             # pylint: disable=raising-bad-type
    126             # Pylint doesn't recognize that this is valid in this case.
--> 127             raise self._exception
    128 
    129         return self._result

BadRequest: 400 Provided Schema does not match Table myproject:mydataset.mytable. Field date has changed type from DATETIME to TIMESTAMP

Does it help?

Thanks.
Regards,
Tiago Bevilaqua

@tseaver
Copy link
Contributor

tseaver commented Mar 12, 2019

@tiagobevilaqua You are converting the test_column values to the Pandas type, datetime64[ms]. When rendered to Parquet format (which is what Client.load_table_from_dataframe does under the hood), that value gets stored as a NumPy datetime64[ns] value, which is gets mapped onto TIMESTAMP when read, rather than DATETIME.

@tiagobevilaqua
Copy link

@tseaver So, it means that I cannot force to store as DATETIME in BigQuery using load_table_from_dataframe?

@tseaver
Copy link
Contributor

tseaver commented Mar 13, 2019

@tiagobevilaqua

it means that I cannot force to store as DATETIME in BigQuery using load_table_from_dataframe?

Not if you've already converted the value using the datetime64[ms] type coercion. If you render the datetime column values as string in one of the canonical formats for DATETIME, then it should load properly into a DATETIME column.

@tiagobevilaqua
Copy link

@tseaver Sorry if I'm bothering you...

I just tried using as STRING (in the canonical format YYYY-MM-DD) but still doesn't load the data:

df.info()
<class 'pandas.core.frame.DataFrame'>
Index: 29032 entries, 1993-07-01 to 2018-10-01
Data columns (total 3 columns):
subcategory    29032 non-null object
state_code     29032 non-null object
services       29032 non-null object
dtypes: object(3)
memory usage: 907.2+ KB

I'm getting a similar error message:

client.load_data_from_dataframe(df, table_ref).result()

BadRequest: 400 Provided Schema does not match Table myproject:mydataset.mytable. Field date has changed type from DATETIME to STRING

Can you suggest something I missed?

@tseaver
Copy link
Contributor

tseaver commented Mar 14, 2019

@tiagobevilaqua Not to worry. Summoning @tswast to see if he can help figure out the right path.

@tswast
Copy link
Contributor

tswast commented Mar 15, 2019

I think this has to do with the conversion from pandas dataframe to parquet. We'd actually have the same problem for most other data formats as well since DATETIME is a somewhat unique-to-BigQuery datatype.

Edit: Avro has a logicalType for DATETIME.

  {'name': 'dt_col',
   'type': ['null', {'type': 'string', 'logicalType': 'datetime'}]},

I'll have to investigate further to see if Parquet has something equivalent. Then we'll have to convince pyarrow to use that type when encoding a dataframe with a naive datetime or maybe it has a way to supply logical types directly?

@tswast
Copy link
Contributor

tswast commented Mar 15, 2019

I can't find an equivalent DATETIME in the Parquet logical types list at https://github.com/apache/parquet-format/blob/master/LogicalTypes.md. I've filed https://jira.apache.org/jira/browse/PARQUET-1545

In the meantime, we have a few options:

  • (1) switch to another serialization format such as Avro, JSON, or Orc.
  • (2) wait for parquet to add support for datetime and then for that to get implemented in pyarrow and BigQuery.

Thoughts?

@tiagobevilaqua
Copy link

Good to know that!

I'll try a workaround with the 1st option and wait for the 2nd one.

Thanks a lot for your help.

@tswast
Copy link
Contributor

tswast commented Mar 19, 2019

https://jira.apache.org/jira/browse/PARQUET-1545 wasn't the cause. Parquet does support a logical type for naive datetime and timezone-aware timestamp. Arrow also supports both naive and timezone-aware timestamps, but it seems by default it converts to naive types (https://issues.apache.org/jira/browse/ARROW-4965).

I haven't seen where pyarrow sets isAdjustedToUTC in it's Parquet serialization, yet. It might not, in which case, that'll be another feature request for pyarrow.

@tswast
Copy link
Contributor

tswast commented Nov 12, 2019

Closed by #8105 and #9064 which allow BigQuery types to be overrided by supplying a schema to QueryJobConfig.schema in the job_config argument.

# [START bigquery_load_table_dataframe]
from google.cloud import bigquery
import pandas
# TODO(developer): Construct a BigQuery client object.
# client = bigquery.Client()
# TODO(developer): Set table_id to the ID of the table to create.
# table_id = "your-project.your_dataset.your_table_name"
records = [
{"title": u"The Meaning of Life", "release_year": 1983},
{"title": u"Monty Python and the Holy Grail", "release_year": 1975},
{"title": u"Life of Brian", "release_year": 1979},
{"title": u"And Now for Something Completely Different", "release_year": 1971},
]
dataframe = pandas.DataFrame(
records,
# In the loaded table, the column order reflects the order of the
# columns in the DataFrame.
columns=["title", "release_year"],
# Optionally, set a named index, which can also be written to the
# BigQuery table.
index=pandas.Index(
[u"Q24980", u"Q25043", u"Q24953", u"Q16403"], name="wikidata_id"
),
)
job_config = bigquery.LoadJobConfig(
# Specify a (partial) schema. All columns are always written to the
# table. The schema is used to assist in data type definitions.
schema=[
# Specify the type of columns whose type cannot be auto-detected. For
# example the "title" column uses pandas dtype "object", so its
# data type is ambiguous.
bigquery.SchemaField("title", bigquery.enums.SqlTypeNames.STRING),
# Indexes are written if included in the schema by name.
bigquery.SchemaField("wikidata_id", bigquery.enums.SqlTypeNames.STRING),
],
# Optionally, set the write disposition. BigQuery appends loaded rows
# to an existing table by default, but with WRITE_TRUNCATE write
# disposition it replaces the table with the loaded data.
write_disposition="WRITE_TRUNCATE",
)
job = client.load_table_from_dataframe(
dataframe,
table_id,
job_config=job_config,
location="US", # Must match the destination dataset location.
) # Make an API request.
job.result() # Wait for the job to complete.
table = client.get_table(table_id) # Make an API request.
print(
"Loaded {} rows and {} columns to {}".format(
table.num_rows, len(table.schema), table_id
)
)
# [END bigquery_load_table_dataframe]

@tswast tswast closed this as completed Nov 12, 2019
@francois-baptiste
Copy link

Hi @tswast, I do not understand how supplying a schema to QueryJobConfig.schema can fix the problem. Can you provide an example specific to this issue (with a DATETIME field)? Thanks!

@tswast
Copy link
Contributor

tswast commented Mar 18, 2020

Unfortunately, the backend API for loading Parquet files (the serialization format we use for dataframes) does not support DATETIME values, yet.

@francois-baptiste
Copy link

Thank you for the clarification @tswast !
Waiting for Parquet file to be supported by the backend API and for the record here is a workaround solution using an in memory load_table_from_file.

# import io
# import pandas as pd
# from google.cloud import bigquery
# client = bigquery.Client()
# dataset_id = 'my_dataset'
# table_id = 'my_table'

dataset_ref = client.dataset(dataset_id)
table_ref = dataset_ref.table(table_id)
job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
job_config.schema=[
        bigquery.SchemaField('some_id', bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField('some_datetime', bigquery.enums.SqlTypeNames.DATETIME),
]

# Create some pandas dataframe
df=pd.DataFrame.from_dict([
        {'some_id': 'A', 'some_datetime': pd.datetime(2021, 10, 4, 10, 3)},
        {'some_id': 'B', 'some_datetime': pd.datetime(2021, 10, 4, 10, 5)},
        {'some_id': 'C', 'some_datetime': pd.datetime(2021, 10, 4, 23, 14)}])

df['some_datetime']=df['some_datetime'].dt.strftime("%Y-%m-%dT%H:%M:%S")

with io.StringIO(df.to_json(orient="records",lines=True)) as source_file:
    job = client.load_table_from_file(source_file, table_ref, job_config=job_config)

job.result()  # Waits for table load to complete.

print("Loaded {} rows into {}:{}.".format(job.output_rows, dataset_id, table_id))

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: bigquery Issues related to the BigQuery API. type: question Request for information or clarification. Not an issue.
Projects
None yet
Development

No branches or pull requests

5 participants