Skip to content

Commit

Permalink
Refactor Normalization to handle nested Streams in new Catalog API (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ChristopheDuong authored Feb 15, 2021
1 parent 8df1a13 commit d799b45
Show file tree
Hide file tree
Showing 17 changed files with 679 additions and 364 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "424892c4-daac-4491-b35d-c6688ba547ba",
"name": "Snowflake",
"dockerRepository": "airbyte/destination-snowflake",
"dockerImageTag": "0.1.17",
"dockerImageTag": "0.1.18",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/snowflake"
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
- destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba
name: Snowflake
dockerRepository: airbyte/destination-snowflake
dockerImageTag: 0.1.17
dockerImageTag: 0.1.18
documentationUrl: https://docs.airbyte.io/integrations/destinations/snowflake
- destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
name: Redshift
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/bases/base-normalization/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ WORKDIR /airbyte

ENTRYPOINT ["/airbyte/entrypoint.sh"]

LABEL io.airbyte.version=0.1.10
LABEL io.airbyte.version=0.1.12
LABEL io.airbyte.name=airbyte/normalization
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,22 @@ clean-targets: # directories to be removed by `dbt clean`

quoting:
database: true
schema: true
# Temporarily disabling the behavior of the ExtendedNameTransformer on table/schema names, see (issue #1785)
# all schemas should be unquoted
schema: false
identifier: true

# You can define configurations for models in the `source-paths` directory here.
# Using these configurations, you can enable or disable models, change how they
# are materialized, and more!
models:
+materialized: table
airbyte_utils:
generated:
airbyte_views:
+materialized: view
airbyte_tables:
+materialized: table
+materialized: table

vars:
dbt_utils_dispatch_list: ['airbyte_utils']
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,91 @@
- postgres: unnest() -> https://www.postgresqltutorial.com/postgresql-array/
#}

{# flatten ------------------------------------------------- #}
{# cross_join_unnest ------------------------------------------------- #}

{% macro unnest(array_col) -%}
{{ adapter.dispatch('unnest')(array_col) }}
{% macro cross_join_unnest(table_name, array_col) -%}
{{ adapter.dispatch('cross_join_unnest')(table_name, array_col) }}
{%- endmacro %}

{% macro default__unnest(array_col) -%}
unnest({{ adapter.quote_as_configured(array_col, 'identifier')|trim }})
{% macro default__cross_join_unnest(table_name, array_col) -%}
{% do exceptions.warn("Undefined macro cross_join_unnest for this destination engine") %}
{%- endmacro %}

{% macro bigquery__unnest(array_col) -%}
unnest({{ adapter.quote_as_configured(array_col, 'identifier')|trim }})

{% macro bigquery__cross_join_unnest(table_name, array_col) -%}
cross join unnest({{ array_col }}) as _airbyte_data
{%- endmacro %}

{% macro postgres__cross_join_unnest(table_name, array_col) -%}
cross join jsonb_array_elements(
case jsonb_typeof({{ array_col }})
when 'array' then {{ array_col }}
else '[]' end
) as _airbyte_data
{%- endmacro %}

{% macro redshift__cross_join_unnest(table_name, array_col) -%}
left join joined on _airbyte_{{ table_name }}_hashid = joined._airbyte_hashid
{%- endmacro %}

{% macro snowflake__cross_join_unnest(table_name, array_col) -%}
cross join table(flatten({{ array_col }})) as _airbyte_data
{%- endmacro %}

{# unnested_column_value ------------------------------------------------- #}

{% macro unnested_column_value(column_col) -%}
{{ adapter.dispatch('unnested_column_value')(column_col) }}
{%- endmacro %}

{% macro default__unnested_column_value(column_col) -%}
{{ column_col }}
{%- endmacro %}

{% macro postgres__unnest(array_col) -%}
unnest({{ adapter.quote_as_configured(array_col, 'identifier')|trim }})
{% macro snowflake__unnested_column_value(column_col) -%}
{{ column_col }}.value
{%- endmacro %}

{% macro redshift__unnest(array_col) -%}
-- FIXME to implement as described here? https://blog.getdbt.com/how-to-unnest-arrays-in-redshift/
{% macro redshift__unnested_column_value(column_col) -%}
_airbyte_data
{%- endmacro %}

{% macro snowflake__unnest(array_col) -%}
table(flatten({{ adapter.quote_as_configured(array_col, 'identifier')|trim }}))
{# unnest_cte ------------------------------------------------- #}

{% macro unnest_cte(table_name, column_col) -%}
{{ adapter.dispatch('unnest_cte')(table_name, column_col) }}
{%- endmacro %}

{% macro default__unnest_cte(table_name, column_col) -%}{%- endmacro %}

{# -- based on https://blog.getdbt.com/how-to-unnest-arrays-in-redshift/ #}
{% macro redshift__unnest_cte(table_name, column_col) -%}
{%- if not execute -%}
{{ return('') }}
{% endif %}
{%- call statement('max_json_array_length', fetch_result=True) -%}
with max_value as (
select max(json_array_length({{ column_col }}, true)) as max_number_of_items
from {{ ref(table_name) }}
)
select
case when max_number_of_items is not null and max_number_of_items > 1
then max_number_of_items
else 1 end as max_number_of_items
from max_value
{%- endcall -%}
{%- set max_length = load_result('max_json_array_length') -%}
with numbers as (
{{dbt_utils.generate_series(max_length["data"][0][0])}}
),
joined as (
select
_airbyte_{{ table_name }}_hashid as _airbyte_hashid,
json_extract_array_element_text({{ column_col }}, numbers.generated_number::int - 1, true) as _airbyte_data
from {{ ref(table_name) }}
cross join numbers
-- only generate the number of records in the cross join that corresponds
-- to the number of items in {{ table_name }}.{{ column_col }}
where numbers.generated_number <= json_array_length({{ column_col }}, true)
)
{%- endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{# json ------------------------------------------------- #}

{%- macro type_json() -%}
{{ adapter.dispatch('type_json')() }}
{%- endmacro -%}

{% macro default__type_json() %}
string
{% endmacro %}

{%- macro redshift__type_json() -%}
varchar
{%- endmacro -%}

{% macro postgres__type_json() %}
jsonb
{% endmacro %}

{% macro snowflake__type_json() %}
variant
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
- Bigquery: JSON_EXTRACT(json_string_expr, json_path_format) -> https://cloud.google.com/bigquery/docs/reference/standard-sql/json_functions
- Snowflake: JSON_EXTRACT_PATH_TEXT( <column_identifier> , '<path_name>' ) -> https://docs.snowflake.com/en/sql-reference/functions/json_extract_path_text.html
- Redshift: json_extract_path_text('json_string', 'path_elem' [,'path_elem'[, …] ] [, null_if_invalid ] ) -> https://docs.aws.amazon.com/redshift/latest/dg/JSON_EXTRACT_PATH_TEXT.html
- Postgres: json_extract_path_text(<from_json>, 'path' [, 'path' [, ...]]) -> https://www.postgresql.org/docs/12/functions-json.html
- Postgres: json_extract_path_text(<from_json>, 'path' [, 'path' [, ...}}) -> https://www.postgresql.org/docs/12/functions-json.html
#}

{# format_json_path -------------------------------------------------- #}
Expand Down Expand Up @@ -38,23 +38,23 @@
{%- endmacro %}

{% macro default__json_extract(json_column, json_path_list) -%}
json_extract({{ adapter.quote_as_configured(json_column, 'identifier')|trim }}, {{ format_json_path(json_path_list) }})
json_extract({{ json_column }}, {{ format_json_path(json_path_list) }})
{%- endmacro %}

{% macro bigquery__json_extract(json_column, json_path_list) -%}
json_extract({{ adapter.quote_as_configured(json_column, 'identifier')|trim }}, {{ format_json_path(json_path_list) }})
json_extract({{ json_column }}, {{ format_json_path(json_path_list) }})
{%- endmacro %}

{% macro postgres__json_extract(json_column, json_path_list) -%}
jsonb_extract_path_text({{ adapter.quote_as_configured(json_column, 'identifier')|trim }}, {{ format_json_path(json_path_list) }})
jsonb_extract_path({{ json_column }}, {{ format_json_path(json_path_list) }})
{%- endmacro %}

{% macro redshift__json_extract(json_column, json_path_list) -%}
case when json_extract_path_text({{ adapter.quote_as_configured(json_column, 'identifier')|trim }}, {{ format_json_path(json_path_list) }}) != '' then json_extract_path_text({{ adapter.quote_as_configured(json_column, 'identifier')|trim }}, {{ format_json_path(json_path_list) }}) end
case when json_extract_path_text({{ json_column }}, {{ format_json_path(json_path_list) }}) != '' then json_extract_path_text({{ json_column }}, {{ format_json_path(json_path_list) }}) end
{%- endmacro %}

{% macro snowflake__json_extract(json_column, json_path_list) -%}
json_extract_path_text({{ adapter.quote_as_configured(json_column, 'identifier')|trim }}, {{ format_json_path(json_path_list) }})
get_path(parse_json({{ json_column }}), {{ format_json_path(json_path_list) }})
{%- endmacro %}

{# json_extract_scalar ------------------------------------------------- #}
Expand All @@ -64,23 +64,23 @@
{%- endmacro %}

{% macro default__json_extract_scalar(json_column, json_path_list) -%}
json_extract_scalar({{ adapter.quote_as_configured(json_column, 'identifier')|trim }}, {{ format_json_path(json_path_list) }})
json_extract_scalar({{ json_column }}, {{ format_json_path(json_path_list) }})
{%- endmacro %}

{% macro bigquery__json_extract_scalar(json_column, json_path_list) -%}
json_extract_scalar({{ adapter.quote_as_configured(json_column, 'identifier')|trim }}, {{ format_json_path(json_path_list) }})
json_extract_scalar({{ json_column }}, {{ format_json_path(json_path_list) }})
{%- endmacro %}

{% macro postgres__json_extract_scalar(json_column, json_path_list) -%}
jsonb_extract_path_text({{ adapter.quote_as_configured(json_column, 'identifier')|trim }},{{ format_json_path(json_path_list) }})
jsonb_extract_path_text({{ json_column }}, {{ format_json_path(json_path_list) }})
{%- endmacro %}

{% macro redshift__json_extract_scalar(json_column, json_path_list) -%}
case when json_extract_path_text({{ adapter.quote_as_configured(json_column, 'identifier')|trim }}, {{ format_json_path(json_path_list) }}) != '' then json_extract_path_text({{ adapter.quote_as_configured(json_column, 'identifier')|trim }}, {{ format_json_path(json_path_list) }}) end
case when json_extract_path_text({{ json_column }}, {{ format_json_path(json_path_list) }}) != '' then json_extract_path_text({{ json_column }}, {{ format_json_path(json_path_list) }}) end
{%- endmacro %}

{% macro snowflake__json_extract_scalar(json_column, json_path_list) -%}
json_extract_path_text({{ adapter.quote_as_configured(json_column, 'identifier')|trim }}, {{ format_json_path(json_path_list) }})
to_varchar(get_path(parse_json({{ json_column }}), {{ format_json_path(json_path_list) }}))
{%- endmacro %}

{# json_extract_array ------------------------------------------------- #}
Expand All @@ -90,21 +90,21 @@
{%- endmacro %}

{% macro default__json_extract_array(json_column, json_path_list) -%}
json_extract_array({{ adapter.quote_as_configured(json_column, 'identifier')|trim }}, {{ format_json_path(json_path_list) }})
json_extract_array({{ json_column }}, {{ format_json_path(json_path_list) }})
{%- endmacro %}

{% macro bigquery__json_extract_array(json_column, json_path_list) -%}
json_extract_array({{ adapter.quote_as_configured(json_column, 'identifier')|trim }}, {{ format_json_path(json_path_list) }})
json_extract_array({{ json_column }}, {{ format_json_path(json_path_list) }})
{%- endmacro %}

{% macro postgres__json_extract_array(json_column, json_path_list) -%}
jsonb_array_elements(jsonb_extract_path({{ adapter.quote_as_configured(json_column, 'identifier')|trim }},{{ format_json_path(json_path_list) }})
jsonb_extract_path({{ json_column }}, {{ format_json_path(json_path_list) }})
{%- endmacro %}

{% macro redshift__json_extract_array(json_column, json_path_list) -%}
json_extract_path_text({{ adapter.quote_as_configured(json_column, 'identifier')|trim }}, {{ format_json_path(json_path_list) }})
json_extract_path_text({{ json_column }}, {{ format_json_path(json_path_list) }})
{%- endmacro %}

{% macro snowflake__json_extract_array(json_column, json_path_list) -%}
json_extract_path_text({{ adapter.quote_as_configured(json_column, 'identifier')|trim }}, {{ format_json_path(json_path_list) }})
get_path(parse_json({{ json_column }}), {{ format_json_path(json_path_list) }})
{%- endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,19 @@
case when {{ boolean_column }} then 'true' else 'false' end
{%- endmacro %}

{# array_to_varchar ------------------------------------------------- #}
{% macro array_to_varchar(array_column) -%}
{{ adapter.dispatch('array_to_varchar')(array_column) }}
{%- endmacro %}

{% macro default__array_to_varchar(array_column) -%}
{{ array_column }}
{%- endmacro %}

{% macro bigquery__array_to_varchar(array_column) -%}
array_to_string({{ array_column }}, "|", "")
{%- endmacro %}

{# cast_to_boolean ------------------------------------------------- #}
{% macro cast_to_boolean(field) -%}
{{ adapter.dispatch('cast_to_boolean')(field) }}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-- see https://docs.getdbt.com/docs/building-a-dbt-project/building-models/using-custom-schemas/#an-alternative-pattern-for-generating-schema-names
{% macro generate_schema_name(custom_schema_name, node) -%}
{{ generate_schema_name_for_env(custom_schema_name, node) }}
{%- endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@
"INITIALLY",
"INNER",
"INTERSECT",
"INTERVAL",
"INTO",
"IS",
"ISNULL",
Expand Down
Loading

0 comments on commit d799b45

Please sign in to comment.