-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor Normalization to handle nested Streams in new Catalog API #2044
Conversation
@@ -57,9 +59,10 @@ | |||
SNOWFLAKE | |||
} | |||
|
|||
public DefaultNormalizationRunner(final DestinationType destinationType, final ProcessBuilderFactory pbf) { | |||
public DefaultNormalizationRunner(final DestinationType destinationType, final ProcessBuilderFactory pbf, boolean useDevVersion) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to keep this or was it just useful while you were doing manual testing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's open for discussion, I am thinking we can keep it...
The intended behavior is:
- to automatically use
normalization:dev
images when the destination is used withdev
. - If the destination is using a numbered version, then normalization is also using the declared numbered version.
It's just to avoid switching between number/dev image tags when working on a PR of normalization
(I noticed @sherifnada does that often too) and keep in mind to switch it back before merging
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cool. let's work through this feature in a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
airbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationRunnerFactory.java
Outdated
Show resolved
Hide resolved
airbyte-integrations/bases/base-normalization/normalization/transform_catalog/transform.py
Outdated
Show resolved
Hide resolved
...e-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/array.sql
Outdated
Show resolved
Hide resolved
airbyte-integrations/bases/base-normalization/normalization/transform_catalog/transform.py
Outdated
Show resolved
Hide resolved
child_str = child[0:55] | ||
else: | ||
child_str = child | ||
for i in range(1, 100): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is this doing? it's handling naming collisions of child tables, i think? i think i believe that there's a real problem here, but can you help me understand exactly what it is and how it is being solved?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, it's handling name collisions from the currently processed catalog...
In Stripe connector for example, in the charges
stream, there is the concept of card
multiple times and depending on where it's from, it has different schemas:
- charges/card
- charges/source/card
- charges/payment_method_details/card
The "exploded" tables are:
- named after the nested column name (in my example, it's
card
) - If I name them after the "path" to the nested column, it's making super long names that easily overflows the number of character limits allowed and I hit even more naming problems (and potential collisions if truncating)
- if the name was already used by this stream or another stream in the same catalog, then it just adds an integer
i
counting allowing up to 100 collisions. - Top-level stream names have priority over nested children so they will retain the non-suffixed versions (for example I have
charges
stream and a nestedcustomer.charges
in thecustomer
stream)
Sometimes, the collisions have the same schema (below with address
) and could be unioned/merged into a single table but I am leaving it to the user to do that extra step afterward... So if someone wants to merge the different card
tables into a single one (or the address
ones) by keeping only a subset of common fields or filled in with blanks, then it's up to them...
With the naming, it's easy to retrieve all the cards
table too since they are all grouped next to each other alphabetically.
Of course, I haven't documented all this yet
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
discussed with chris he will create an issue for this. we can live with this for now but need to fix quickly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Issue created here: #2055
airbyte-integrations/bases/base-normalization/normalization/transform_catalog/transform.py
Show resolved
Hide resolved
airbyte-integrations/bases/base-normalization/normalization/transform_catalog/transform.py
Show resolved
Hide resolved
@@ -46,7 +46,7 @@ public void createTableIfNotExists(JdbcDatabase database, String schemaName, Str | |||
final String createTableQuery = String.format( | |||
"CREATE TABLE IF NOT EXISTS %s.%s ( \n" | |||
+ "%s VARCHAR PRIMARY KEY,\n" | |||
+ "\"%s\" VARIANT,\n" | |||
+ "%s VARIANT,\n" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this changing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All destinations are producing the _airbyte_data
column without quotes and as a case-insensitive column
@@ -29,14 +29,20 @@ clean-targets: # directories to be removed by `dbt clean` | |||
|
|||
quoting: | |||
database: true | |||
schema: true | |||
schema: false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will add the same comment:
Temporarily disabling the behavior of the ExtendedNameTransformer, see (issue #1785)
Since we don't use Extended Names for tables and schemas anymore but replace all special characters with _
then DBT shouldn't be using quoting when querying them otherwise we run into case-sensitive and quoting issues...
The destination may produce schema/table names that are case-insensitive and dbt would be case-sensitive with quoting when querying those schema/tables... resulting in conflicts and exceptions
Exciting that we're going to support nesting!!!! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments but will take another look tomorrow. I think my biggest takeaway right now is that this class is really complex/inaccessible and we should try to make it simpler. It doesn't necessarily have to happen in this PR since I know we are trying to push this out, but I do think it needs to be one of the top things to work on in this area
airbyte-integrations/bases/base-normalization/normalization/transform_catalog/transform.py
Show resolved
Hide resolved
airbyte-integrations/bases/base-normalization/normalization/transform_catalog/transform.py
Outdated
Show resolved
Hide resolved
airbyte-integrations/bases/base-normalization/normalization/transform_catalog/transform.py
Show resolved
Hide resolved
@@ -139,6 +101,280 @@ def extract_schema(profiles_yml: dict) -> str: | |||
return profiles_yml["schema"] | |||
|
|||
|
|||
def generate_dbt_model(schema: str, output: str, integration_type: str, catalog: dict, json_col: str) -> Dict[str, Set[str]]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Shouldn't integration type be an enum instead of a string?
- can you add a docstring explaining what each of these params are? at first look, it's really not obvious what
output
orschema
orjson_col
are supposed to be? (maye the last one is justjson_column_name
?)
airbyte-integrations/bases/base-normalization/normalization/transform_catalog/transform.py
Outdated
Show resolved
Hide resolved
airbyte-integrations/bases/base-normalization/normalization/transform_catalog/transform.py
Outdated
Show resolved
Hide resolved
airbyte-integrations/bases/base-normalization/normalization/transform_catalog/transform.py
Outdated
Show resolved
Hide resolved
sql += "\nfrom numbered where row_num = 1\n" | ||
if len(path) > 1: | ||
sql += "-- {} from {}\n ".format(name, "/".join(path)) | ||
output_sql_table(output, schema, sql_file_name, sql, path) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems like the output of this method is basically a side-effect i.e: writing to the file system? Can we push that concern outside of this method? It will make it easier to write unit tests
name, | ||
table, | ||
parent_hash_id: str, | ||
inject_sql_prefix: str, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's hard for me to conceptually understand what's happening in this method without a lot of mental gymnastics and poring over code. For example, it would really help me to have a mental model of e.g:
first we create an intermediate view containing XYZ, then another containing ABC, then we derive the final normalized view. Nested table are handled via etc...
As a sidenote I've generally felt that the cost of "reloading" context for this class whenever I'm working on it/reading it is fairly large. I don't have a silver bullet solution -- it's probably a combination of commenting or docs plus potentially refactors to make it easier to more obvious, but I think it's important to point this concern out
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Slowly getting there... the previous version was even worse, at least here I managed to separate in different SQL files as successive parts yes.
The cool thing is that it creates more table intermediate views (ab1_, ab2_, ab3_ and final table) than before but it's easy to switch it back to a single SQL file with dbt materialization's option
The steps are chained together in a transformation pipeline and I will be able to describe what each "unit" does and choose to include or exclude each one (with its own testing etc?) but I haven't reached there yet
airbyte-integrations/bases/base-normalization/normalization/transform_catalog/transform.py
Outdated
Show resolved
Hide resolved
Yes, I definitely agree here I need some time to deeply refactor/split the code properly... but because of time constraints every time I handle normalization projects, I haven't been able to revamp it more. It kind of grew organically sprouting way too many arguments in the functions etc |
/test connector=destination-postgres
|
/test connector=destination-bigquery
|
/test connector=destination-snowflake
|
/test connector=destination-redshift
|
b50ce3c
to
27cc7b1
Compare
/test connector=destination-bigquery
|
This reverts commit 61f54e7.
f122e31
to
51c39fd
Compare
What
Closes #886 and related issues
Closes #1426
How
Create tables for nested columns
Pre-merge Checklist
Recommended reading order
airbyte-integrations/bases/base-normalization/normalization/transform_catalog/transform.py