Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Load CSV files fails on date and numeric types #139

Closed
chinwobble opened this issue Jan 12, 2021 · 7 comments · Fixed by #166
Closed

Load CSV files fails on date and numeric types #139

chinwobble opened this issue Jan 12, 2021 · 7 comments · Fixed by #166
Labels
bug Something isn't working good_first_issue Good for newcomers

Comments

@chinwobble
Copy link

Describe the bug

When you try to seed csv into (hive) it fails if all the types are string.

Steps To Reproduce

  1. Install dbt 0.18.1 and dbt-spark
  2. create a simple csv file as below:
id,name,first_payment_date,maturity_date,loan_amount,installment_amount
1,LAI-1,2020-01-01,2020-10-01,100000.00,1000.00
  1. Define some column types for your csv
seeds:
  my_new_project:
    loan_account:
      +column_types:
        id: string
        name: string
        first_payment_date: timestamp
        maturity_date: timestamp
        loan_amount: decimal(20,2)
        installment_amount: decimal(20,2)

Expected behavior

Should be able to insert csv data easily

If you specify a column_type as double / decimal in column_types, the csv load treats them as string
https://github.com/fishtown-analytics/dbt/blob/v0.18.1/core/dbt/context/providers.py#L732

I want loan_amount to be treated as a decimal. If I omit it from the +column_types then it gets treated as double.
There doesn't seem to be any easy way to override the csv loading behaviour.

Screenshots and log output

Completed with 1 error and 0 warnings:

Runtime Error in seed loan_account (data/loan_account.csv)
  Database Error
    Error running query: org.apache.spark.sql.AnalysisException: Cannot write incompatible data to table '`default`.`loan_account`':
    - Cannot safely cast 'loan_amount': string to decimal(20,2)
    - Cannot safely cast 'installment_amount': string to decimal(20,2);

The SQL that is generated is

insert into default.loan_account values('1','LAI-1','2020-01-01', '2020-01-10','100', '100')

The desired output should be something like this

insert into default.loan_account values('1','LAI-1',cast('2020-01-01' as Date), cast('2020-01-10' as Date),100, 100)
  1. Wrap the timestamp / dates in CAST
  2. and don't treat numeric values as strings

Additional info

I've customised a the macro a little to support cast as date, but its messy.

{% macro spark__load_csv_rows(model, agate_table) %}
    {% set batch_size = 1000 %}

    {% set statements = [] %}

    {% for chunk in agate_table.rows | batch(batch_size) %}
        {% set bindings = [] %}

        {% for row in chunk %}
          {% do bindings.extend(row) %}
        {% endfor %}

        {% set sql %}
            insert into {{ this.render() }} values
            {% for row in chunk -%}
                ({%- for column in agate_table.columns -%}
                    {%- if 'ISODate' in (column.data_type | string) -%}
                      cast(%s as date)
                    {%- elif 'date' in (column.name) -%}
                      cast(%s as date)
                    {%- else -%}
                    %s
                    {%- endif -%}
                    {%- if not loop.last%},{%- endif %}
                {%- endfor -%})
                {%- if not loop.last%},{%- endif %}
            {%- endfor %}
        {% endset %}

        {% for column in agate_table.columns  -%}
          {% do log(column.name ~ (column.data_type | string), True) %}
        {%- endfor %}

        {% do log(sql, True) %}
        {% do adapter.add_query(sql, bindings=bindings, abridge_sql_log=True) %}

        {% if loop.index0 == 0 %}
            {% do statements.append(sql) %}
        {% endif %}
    {% endfor %}

    {# Return SQL so we can render it out into the compiled files #}
    {{ return(statements[0]) }}
{% endmacro %}
@chinwobble chinwobble added bug Something isn't working triage labels Jan 12, 2021
@jtcohen6 jtcohen6 added good_first_issue Good for newcomers and removed triage labels Jan 12, 2021
@jtcohen6
Copy link
Contributor

jtcohen6 commented Jan 12, 2021

Hey @chinwobble, thanks for the detailed writeup and reproduction case. I assume you're running with Spark 3.x, since that's where we saw this issue first crop up; could you confirm?

Here are the statements that dbt seed --full-refresh runs with the column type overrides in dbt_project.yml, as you describe them in step 3 above. It would run the same statements without those overrides, too, since dbt uses the python agate library to infer (really guess at) data types based on the contents of the CSV file.

drop table dbt.loan_account;
create table dbt.loan_account (id string,name string,first_payment_date timestamp,maturity_date timestamp,loan_amount decimal(20,2),installment_amount decimal(20,2));
insert into dbt.loan_account values('1','LAI-1','2020-01-01', '2020-01-10','100', '100');
select * from dbt.loan_account;

This works in our local dockerized version, which uses Apache Spark 2.4.5, but it does not work when I run it against a Databricks cluster running Spark 3.0.1.

Instead, it does work if I add an override in dbt_project.yml that sets every column type to be string:

drop table dbt.loan_account;
create table dbt.loan_account (id string,name string,first_payment_date string,maturity_date string,loan_amount string,installment_amount string);
insert into dbt.loan_account values('1','LAI-1','2020-01-01', '2020-01-10','100', '100');
select * from dbt.loan_account;

Then, in a staging model, I can cast these string columns to be their "real" types.

I agree, however, that the best solution here is to add those castings directly within the seed materialization. I think you had the right initial idea with the code to change:
https://github.com/fishtown-analytics/dbt-spark/blob/2f7c2dcb419b66d463bb83212287bd29280d6964/dbt/include/spark/macros/materializations/seed.sql#L13-L26

Crucially, we need to cast() every non-string column to its data type, not just timestamps. I think we could generalize that to "cast every column":

    {%- set column_override = model['config'].get('column_types', {}) -%}
    ...
        {% set sql %}
            insert into {{ this.render() }} values
            {% for row in chunk -%}
                ({%- for col_name in agate_table.column_names -%}
                    {%- set inferred_type = adapter.convert_type(agate_table, loop.index0) -%}
                    {%- set type = column_override.get(col_name, inferred_type) -%}
                      cast(%s as {{type}})
                    {%- if not loop.last%},{%- endif %}
                {%- endfor -%})
                {%- if not loop.last%},{%- endif %}
            {%- endfor %}
        {% endset %}

Is that a fix you'd be interested in contributing?

@chinwobble
Copy link
Author

chinwobble commented Jan 12, 2021

@jtcohen6 yes I was testing on vanilla spark 3.0.0

Also unrelated, how were you able to get all the SQL that was generated?

  • When I look in the target folder it doesn't have the generated SQL
  • Spark SQL UI doesn't show the sql text for generated
  • dbt seed --show also doesn't show sql text submitted
  • So I resorted to adding log statements inside the macro.

Is there an easier way that I'm not aware of?

@chinwobble
Copy link
Author

chinwobble commented Jan 13, 2021

I tried adding those two lines of code in the macro I noticed some issues:

{%- set inferred_type = adapter.convert_type(agate_table, loop.index0) -%}
{%- set type = column_override.get(col_name, inferred_type) -%}

inferred_type is always set to string if you specify the column +column_types: in dbt_project.yml`
This is because of how agate is used in dbt core
https://github.com/fishtown-analytics/dbt/blob/v0.18.1/core/dbt/context/providers.py#L732

column_override is not in scope when I tried in with dbt-spark 0.18.1

For numeric data types we prob want to use %f instead of %s so they are rendered without the single quotes around them which breaks in spark 3.0.0.

@jtcohen6
Copy link
Contributor

jtcohen6 commented Jan 13, 2021

@chinwobble I grabbed that SQL from logs/dbt.log (or whatever you've configured in your log-path). The seed SQL should also appear in target/run/[project_name]/data/loan_account.csv.

inferred_type is always set to string if you specify the column +column_types: in dbt_project.yml`

This is why we prefer the user-configured column type (if available), and only use the inferred type if there is no type supplied by the user.

column_override is not in scope when I tried in with dbt-spark 0.18.1

You're right, sorry about that! There was a line missing in the code snippet pasted above. I just updated.

Here's what I get back in the logs, which yields a successful load:

insert into dbt_jcohen.loan_account values
(cast(%s as bigint),cast(%s as string),cast(%s as date),cast(%s as date),cast(%s as bigint),cast(%s as bigint))

(%s is just a feature of how the ODBC driver handles sqlparams, those are actually raw values.)

An alternative approach here would be to use adapter.get_columns_in_relation(this) within spark__load_csv_rows, to get the data types of the seed table as it already exists. I think the Jinja would be a bit trickier to write, and it offers only the slim benefit of maybe working even if a user updates the configured data type overrides between creating and populating the seed table—something that, regardless, is best addressed by dbt seed --full-refresh.)

@chinwobble
Copy link
Author

chinwobble commented Jan 17, 2021

@jtcohen6 I'm trying to contribute this fix back into the repository.

I'm starting by creating a unit test the seed macros:

  • spark__load_csv_rows
  • spark__reset_csv_table
  • spark__create_csv_table

I've created this test class:

# test_seed_macros

import unittest
from unittest import mock
import os
import re
from jinja2 import Environment, FileSystemLoader
from dbt.clients import agate_helper
from dbt.clients.jinja import MaterializationExtension
from tempfile import mkdtemp

SAMPLE_CSV_DATA = """a,b,c,d,e,f,g
1,n,test,3.2,20180806T11:33:29.320Z,True,NULL
2,y,asdf,900,20180806T11:35:29.320Z,False,a string"""

class TestSparkSeedMacros(unittest.TestCase):

    def setUp(self):
        self.jinja_env = Environment(loader=FileSystemLoader(
                                        ['dbt/include/spark/macros',
                                         'dbt/include/spark/macros/materializations']),
                                     extensions=['jinja2.ext.do', MaterializationExtension])

        self.config = {}
        self.default_context = {
            'validation': mock.Mock(),
            'model': mock.Mock(),
            'exceptions': mock.Mock(),
            'config': mock.Mock()
        }
        self.default_context['config'].get = lambda key, default=None, **kwargs: self.config.get(key, default)
        self.tempdir = mkdtemp()

    def __get_template(self, template_filename):
        return self.jinja_env.get_template(template_filename, globals=self.default_context)

    def __run_macro(self, template, name, agate_table):
        self.default_context['model'].alias = 'test'
        value = getattr(template.module, name)(self.default_context['model'], agate_table)
        return re.sub(r'\s\s+', ' ', value)

    def test_macros_load(self):
        self.jinja_env.get_template('seed.sql')

    def test_macros_create_csv_table(self):
        template = self.__get_template('seed.sql')

        path = os.path.join(self.tempdir, 'input.csv')
        with open(path, 'wb') as fp:
            fp.write(SAMPLE_CSV_DATA.encode('utf-8'))

        agate_table = agate_helper.from_csv(path, [])
        sql = self.__run_macro(template, 'spark__create_csv_table', agate_table).strip()

        self.assertEqual(sql, "create table my_table as select 1")

When I run this code I get the following error:

self = Undefined, args = (), kwargs = {}

    @internalcode
    def _fail_with_undefined_error(self, *args, **kwargs):
        """Raise an :exc:`UndefinedError` when operations are performed
        on the undefined value.
        """
>       raise self._undefined_exception(self._undefined_message)
E       jinja2.exceptions.UndefinedError: 'this' is undefined

.tox/unit/lib/python3.8/site-packages/jinja2/runtime.py:747: UndefinedError

I assume its not working because of the snippet in seed.sql. Where the this variable is not set anywhere in the jinja environment.

create table {{ this.render() }}

I think ideally I would just want integration tests that tests the seed on spark 2.4.x and spark 3.0.0 but I'm not familiar with circle CI and it seems tricky to setup locally.

@jtcohen6
Copy link
Contributor

@chinwobble I really appreciate your instinct to start by writing unit tests for these macros.

Unfortunately, I think materialization testing requires a few too many dbt-specific syntactical pieces—like this, return, etc—for unit testing with the standard Jinja templates plus only-the-needed plugins, in the vein as test_macros.py. I was running into the same issue over #141.

Instead, we've had success historically in dbt-core (and the "core four" adapters) writing targeted integration tests, with projects reflecting the use cases and edge cases we'd expect users to encounter. As discussed over in #141, we should include the test.integration.base toolbox as a module within pytest-dbt-adapter. Then your seed test could more like this one.

@chinwobble
Copy link
Author

@jtcohen6 As you suggested I've tried to create some integration tests to test materialization.

First I need to start by making sure that dbt-spark is tested on both spark 3 and spark 2.4.

I've created this PR to try to achieve this. However the hive-metastore in spark 3 isn't quite working.
#145

Could you have a look at it?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working good_first_issue Good for newcomers
Projects
None yet
2 participants