Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(#1626) fix for RPC error with BQ nested fields #1638

Merged
merged 3 commits into from
Jul 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions core/dbt/clients/agate_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import dbt.compat

import agate
import json

BOM = BOM_UTF8.decode('utf-8') # '\ufeff'

Expand Down Expand Up @@ -33,6 +34,22 @@ def table_from_data(data, column_names):
return table.select(column_names)


def table_from_data_flat(data, column_names):
"Convert list of dictionaries into an Agate table"

rows = []
for _row in data:
row = []
for value in list(_row.values()):
if isinstance(value, (dict, list, tuple)):
row.append(json.dumps(value))
else:
row.append(value)
rows.append(row)

return agate.Table(rows, column_names)


def empty_table():
"Returns an empty Agate table. To be used in place of None"

Expand Down
4 changes: 2 additions & 2 deletions plugins/bigquery/dbt/adapters/bigquery/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,8 @@ def get_timeout(cls, conn):
@classmethod
def get_table_from_response(cls, resp):
column_names = [field.name for field in resp.schema]
rows = [dict(row.items()) for row in resp]
return dbt.clients.agate_helper.table_from_data(rows, column_names)
return dbt.clients.agate_helper.table_from_data_flat(resp,
column_names)

def raw_execute(self, sql, fetch=False):
conn = self.get_thread_connection()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from test.integration.base import DBTIntegrationTest, use_profile
import json

class TestBaseBigQueryRun(DBTIntegrationTest):

@property
def schema(self):
return "bigquery_test_022"

@property
def models(self):
return "models"

@property
def project_config(self):
return {
'macro-paths': ['macros'],
}

@use_profile('bigquery')
def test__bigquery_fetch_nested_records(self):
sql = """
select
struct(
cast('Michael' as string) as fname,
cast('Stonebreaker' as string) as lname
) as user,
[
struct(1 as val_1, 2 as val_2),
struct(3 as val_1, 4 as val_2)
] as val

union all

select
struct(
cast('Johnny' as string) as fname,
cast('Brickmaker' as string) as lname
) as user,
[
struct(7 as val_1, 8 as val_2),
struct(9 as val_1, 0 as val_2)
] as val
"""


status, res = self.adapter.execute(sql, fetch=True)

self.assertEqual(len(res), 2, "incorrect row count")

expected = {
"user": [
'{"fname": "Michael", "lname": "Stonebreaker"}',
'{"fname": "Johnny", "lname": "Brickmaker"}'
],
"val": [
'[{"val_1": 1, "val_2": 2}, {"val_1": 3, "val_2": 4}]',
'[{"val_1": 7, "val_2": 8}, {"val_1": 9, "val_2": 0}]'
]
}

for i, key in enumerate(expected):
line = "row {} for key {} ({} vs {})".format(i, key, expected[key][i], res[i][key])
# py2 serializes these in an unordered way - deserialize to compare
v1 = expected[key][i]
v2 = res[i][key]
self.assertEqual(json.loads(v1), json.loads(v2), line)