Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: faster snowflake catalogs (#2009) #2037

Merged
merged 4 commits into from
Feb 6, 2020

Conversation

beckjake
Copy link
Contributor

@beckjake beckjake commented Jan 9, 2020

(Tentatively) fix #2009

  • Thread catalog queries across information schemas
  • the get_catalog macro now takes an information schema and a list of schemas in that information schema
  • filtering happens in the database instead of in python

To be honest, I'm not totally sure about the bigquery impact of these changes :( The bigquery code does some funky stuff to handle the fact that the information schema is sometimes per-schema on that database, and I could easily have screwed it up, in which case we'd issue too many queries.

@beckjake beckjake requested a review from drewbanin January 9, 2020 21:11
@cla-bot cla-bot bot added the cla:yes label Jan 9, 2020
@beckjake beckjake force-pushed the feature/faster-snowflake-catalogs branch from 3f09b30 to c6f4434 Compare January 9, 2020 21:18
and sch.nspname not like 'pg\_%' -- avoid postgres system schemas, '_' is a wildcard so escape it
where (
{%- for schema in schemas -%}
sch.nspname = '{{ schema }}'{%- if not loop.last %} or {% endif -%}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should these predicates be case-insensitive on pg/redshift? I think that would mirror the existing behavior

join columns using ("table_database", "table_schema", "table_name")
where (
{%- for schema in schemas -%}
"table_schema" = '{{ schema }}'{%- if not loop.last %} or {% endif -%}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here - we'd want to make this case-insensitive, right?

executor.submit(self._get_one_catalog, info, schemas, manifest)
for info, schemas in schema_map.items() if len(schemas) > 0
]
for future in as_completed(futures):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what would happen here if one of these futures fails (eg. with a database error)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, this is my expectation:

  • future.result() will raise the DatabaseException
  • the exception will go through get_catalog, etc
  • eventually dbt will hit the context manager that cancels connections
  • ultimately dbt exits with an error about the DatabaseException

If we wanted, we could change it to check if the result was an exception, and if so store it until the end so all the threads are done. Then we could just raise the first one, or something.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we wanted, we could change it to check if the result was an exception, and if so store it until the end so all the threads are done. Then we could just raise the first one, or something.

Yeah - I like this idea! I was initially worried that an exception here could cause dbt to hang, but that's definitely not the case. I was getting mixed up with how we do exception handling in the multithreaded job runner.

I think it would be great if the docs generate command wrote out a catalog.json file if any of the catalog queries succeed. This has proven to be a gnarly issue on BigQuery in particular, in which obscure additional permissions are required to fetch data from information schema tables.

Check out some relevant threads from dbt Slack:

Screen Shot 2020-01-31 at 9 40 56 AM
Screen Shot 2020-01-31 at 9 41 09 AM
Screen Shot 2020-01-31 at 9 41 19 AM

I'm picturing something where we catch DatabaseExceptions raised from these futures and log an error message out, but continue waiting for the other futures to return. If any of them return successfully, then we can proceed on to writing out the catalog from the task. After the catalog is written, the task should exit with a nonzero exit code.

Do you buy that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm picturing something where we catch DatabaseExceptions raised from these futures and log an error message out, but continue waiting for the other futures to return. If any of them return successfully, then we can proceed on to writing out the catalog from the task. After the catalog is written, the task should exit with a nonzero exit code.

Sure, we can do that. We should probably make it a warning I guess, so people can opt-in to it being fatal with --strict.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, good idea!

Jacob Beck added 2 commits January 31, 2020 10:53
- Thread information schema queries, one per db
- Pass schema list into catalog queries and filter on that in SQL
- break the existing interface for get_catalog (sorry, not sorry)
@beckjake beckjake force-pushed the feature/faster-snowflake-catalogs branch from c6f4434 to 12f1188 Compare January 31, 2020 18:34
@beckjake beckjake force-pushed the feature/faster-snowflake-catalogs branch from 12f1188 to c1af3ab Compare January 31, 2020 18:52
@beckjake beckjake marked this pull request as ready for review January 31, 2020 18:53
@beckjake beckjake requested a review from drewbanin February 3, 2020 18:00
Copy link
Contributor

@drewbanin drewbanin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking really, really slick. Some unfortunate agate behaviors are preventing me from doing more complete timing tests. Let me know what you think about the comments in here.

if exceptions:
logger.error(
'dbt encountered {} failure{} while writing the catalog'
.format(len(exceptions), (len(exceptions) == 1) * 's')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want != 1 here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, this is exceedingly clever

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, this whole flow looks great in practice!

Encountered an error while generating catalog: Database Error
  Access Denied: Table the-psf:INFORMATION_SCHEMA.SCHEMATA: User does not have permission to query table the-psf:INFORMATION_SCHEMA.SCHEMATA.
21:39:33 | Catalog written to /Users/drew/fishtown/clients/bq/target/catalog.json
dbt encountered 1 failures while writing the catalog

Can we just move this log line to be up above the Catalog written to... line? I want to make it clear that the catalog has still been written despite the errors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hah, yes, it should be !=1. And it is exceedingly clever, just a stupid pluralization hack I remember from a long time ago.

# calculate the possible schemas for a given schema name
all_schema_names: Set[str] = set()
for schema in schemas:
all_schema_names.update({schema, schema.lower(), schema.upper()})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks to me like we're doing case-insensitive schema filtering in the catalog queries for every plugin -- do we need to pass in a set of the lower/upper cased variants of the schema name here? In dev, I see catalog SQL like this (pg):

    where (upper(sch.nspname) = upper('SNAPSHOTS') or upper(sch.nspname) = upper('TEST_SCHEMA') or upper(sch.nspname) = upper('snapshots') or upper(sch.nspname) = upper('test_schema'))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ooh, you're right - this is why the comparison was case-sensitive in catalog.sql before. I'll remove this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that makes a ton of sense - i didn't put those two things together!

# we want to re-raise on ctrl+c and BaseException
if exc is None:
catalog = future.result()
catalogs = agate.Table.merge([catalogs, catalog])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agate seems unhappy about this on Snowflake. I see:

2020-02-04 02:07:41,316840 (MainThread): Tables contain columns with the same names, but different types.
2020-02-04 02:07:41,322568 (MainThread): Traceback (most recent call last):
  File "/Users/drew/fishtown/dbt/core/dbt/main.py", line 81, in main
    results, succeeded = handle_and_check(args)
  File "/Users/drew/fishtown/dbt/core/dbt/main.py", line 159, in handle_and_check
    task, res = run_from_args(parsed)
  File "/Users/drew/fishtown/dbt/core/dbt/main.py", line 212, in run_from_args
    results = task.run()
  File "/Users/drew/fishtown/dbt/core/dbt/task/generate.py", line 214, in run
    catalog_table, exceptions = adapter.get_catalog(self.manifest)
  File "/Users/drew/fishtown/dbt/core/dbt/adapters/base/impl.py", line 1061, in get_catalog
    catalogs, exceptions = catch_as_completed(futures)
  File "/Users/drew/fishtown/dbt/core/dbt/adapters/base/impl.py", line 1151, in catch_as_completed
    catalogs = agate.Table.merge([catalogs, catalog])
  File "/Users/drew/fishtown/dbt/env/lib/python3.7/site-packages/agate/table/merge.py", line 45, in merge
    raise DataTypeError('Tables contain columns with the same names, but different types.')
agate.exceptions.DataTypeError: Tables contain columns with the same names, but different types.

In this case, the error is super annoying. One information_schema contains a table with a clustering key defined (of type string). The other information_schema that gets merged in does not have any clustering keys on any tables, so every value in the stats:clustering_key:value columns is None, which I guess Agate assigned to an integery type?

I ran some debug code to see what was going wrong here, and I saw that the types for catalog and catalogs were:

stats:clustering_key:value': {<agate.data_types.number.Number object at 0x109d26a10>,
                                <agate.data_types.text.Text object at 0x10a02c390>}

While I saw this on Snowflake, I have to imagine we'd see similar things on other databases. Any good ideas about what to do about this one? My (not ideal) thinking is that we could push the raw results into a list, then convert that raw list of lists back into a Table...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a fix for this (roll my own merge function that tracks null-ness). I think it's actually pretty ok.

Added a custom table merge implementation that tracks if a row is all null and merges those as "any type".
 - added unit tests for that!
Removed some schema casing things
fixed pluralization (it was reversed)
@beckjake beckjake force-pushed the feature/faster-snowflake-catalogs branch from 5b36b7d to 04bc2a8 Compare February 4, 2020 14:48
@beckjake beckjake requested a review from drewbanin February 4, 2020 16:07
Copy link
Contributor

@drewbanin drewbanin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! I tested this out on our Snowflake... not scientific at all, but it looks like the information schema queries on Snowflake went from 23s to 11s using this branch :)

The much, much bigger impact of this change though is that individual catalog queries can fail without bricking the generation of the catlaog.json file. This will be great on BigQuery where:

  1. the previous union over all datasets exhausted the compilation memory that BQ allocated (and this query would fail), or
  2. an individual catalog query would fail b/c of bad permissions, bricking the docs generation!

@beckjake beckjake merged commit 0df49c5 into dev/barbara-gittings Feb 6, 2020
@beckjake beckjake deleted the feature/faster-snowflake-catalogs branch February 6, 2020 14:37
@mferryRV
Copy link

@beckjake @drewbanin - is there any way to run this prior to your 0.16.0 release? We just did a load of work to add sources to dbt and now docs won't run 😅

@drewbanin
Copy link
Contributor

Hey @mferryRV - I'm just going through some GitHub issue notifications I've received and wanted to make sure you got an answer here too (I do think we discussed on Slack!)

Try building a virtualenv and installing a pre-release of dbt:

python3 -m venv dbt-env
source dbt-env/bin/activate
pip install dbt==0.16.0b1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Reduce memory consumption in docs generation
3 participants