From 51a3017b96c78479055de73bae8fd85c7b793ef4 Mon Sep 17 00:00:00 2001 From: Chris Date: Fri, 13 Nov 2020 16:02:35 +0100 Subject: [PATCH 1/2] Override dbt_utils.concat & dbt_surrogate_key macros to handle more than 100 columns on postgres #913 --- .../macros/cross_db_utils/concat.sql | 33 ++++++++++++ .../macros/cross_db_utils/surrogate_key.sql | 51 +++++++++++++++++++ .../transform_catalog/transform.py | 5 +- 3 files changed, 88 insertions(+), 1 deletion(-) create mode 100644 airbyte-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/concat.sql create mode 100644 airbyte-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/surrogate_key.sql diff --git a/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/concat.sql b/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/concat.sql new file mode 100644 index 000000000000..ab618cf2e63a --- /dev/null +++ b/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/concat.sql @@ -0,0 +1,33 @@ +{# + Overriding the following macro from dbt-utils: + https://github.com/fishtown-analytics/dbt-utils/blob/0.6.2/macros/cross_db_utils/concat.sql + To implement our own version of concat + Because on postgres, we cannot pass more than 100 arguments to a function +#} + +{% macro concat(fields) -%} + {{ adapter.dispatch('concat')(fields) }} +{%- endmacro %} + +{% macro default__concat(fields) -%} + concat({{ fields|join(', ') }}) +{%- endmacro %} + +{% macro alternative_concat(fields) %} + {{ fields|join(' || ') }} +{% endmacro %} + + +{% macro postgres__concat(fields) %} + {{ dbt_utils.alternative_concat(fields) }} +{% endmacro %} + + +{% macro redshift__concat(fields) %} + {{ dbt_utils.alternative_concat(fields) }} +{% endmacro %} + + +{% macro snowflake__concat(fields) %} + {{ dbt_utils.alternative_concat(fields) }} +{% endmacro %} diff --git a/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/surrogate_key.sql b/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/surrogate_key.sql new file mode 100644 index 000000000000..72126f52043e --- /dev/null +++ b/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/surrogate_key.sql @@ -0,0 +1,51 @@ +{# + Overriding the following macro from dbt-utils: + https://github.com/fishtown-analytics/dbt-utils/blob/0.6.2/macros/sql/surrogate_key.sql + To implement our own version of concat + Because on postgres, we cannot pass more than 100 arguments to a function +#} + +{%- macro surrogate_key(field_list) -%} + +{%- if varargs|length >= 1 or field_list is string %} + +{%- set error_message = ' +Warning: the `surrogate_key` macro now takes a single list argument instead of \ +multiple string arguments. Support for multiple string arguments will be \ +deprecated in a future release of dbt-utils. The {}.{} model triggered this warning. \ +'.format(model.package_name, model.name) -%} + +{%- do exceptions.warn(error_message) -%} + +{# first argument is not included in varargs, so add first element to field_list_xf #} +{%- set field_list_xf = [field_list] -%} + +{%- for field in varargs %} +{%- set _ = field_list_xf.append(field) -%} +{%- endfor -%} + +{%- else -%} + +{# if using list, just set field_list_xf as field_list #} +{%- set field_list_xf = field_list -%} + +{%- endif -%} + + +{%- set fields = [] -%} + +{%- for field in field_list_xf -%} + + {%- set _ = fields.append( + "coalesce(cast(" ~ field ~ " as " ~ dbt_utils.type_string() ~ "), '')" + ) -%} + + {%- if not loop.last %} + {%- set _ = fields.append("'-'") -%} + {%- endif -%} + +{%- endfor -%} + +{{dbt_utils.hash(concat(fields))}} + +{%- endmacro -%} \ No newline at end of file diff --git a/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/transform.py b/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/transform.py index c3e455857365..99e564af8b35 100644 --- a/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/transform.py +++ b/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/transform.py @@ -299,7 +299,10 @@ def process_node( node_properties = extract_node_properties(path=path, json_col=json_col, properties=properties) node_columns = ",\n ".join([sql for sql in node_properties.values()]) hash_node_columns = ",\n ".join([f"adapter.quote_as_configured('{column}', 'identifier')" for column in node_properties.keys()]) - hash_node_columns = jinja_call(f"dbt_utils.surrogate_key([\n {hash_node_columns}\n ])") + # Disable dbt_utils.surrogate_key for own version to fix a bug with Postgres (#913). + # hash_node_columns = jinja_call(f"dbt_utils.surrogate_key([\n {hash_node_columns}\n ])") + # We should re-enable it when our PR to dbt_utils is merged + hash_node_columns = jinja_call(f"surrogate_key([\n {hash_node_columns}\n ])") hash_id = jinja_call(f"adapter.quote_as_configured('_{name}_hashid', 'identifier')") foreign_hash_id = jinja_call(f"adapter.quote_as_configured('_{name}_foreign_hashid', 'identifier')") emitted_col = "{},\n {} as {}".format( From 3a5f5606ae5d67f5a05477f90f3eee6e606a3e30 Mon Sep 17 00:00:00 2001 From: Chris Date: Fri, 13 Nov 2020 16:15:55 +0100 Subject: [PATCH 2/2] =?UTF-8?q?Bump=20version=20for=20base-normalization:?= =?UTF-8?q?=200.1.0=20=E2=86=92=200.1.1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- airbyte-integrations/bases/base-normalization/Dockerfile | 2 +- .../workers/normalization/DefaultNormalizationRunner.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/bases/base-normalization/Dockerfile b/airbyte-integrations/bases/base-normalization/Dockerfile index 766cd3f49244..d2cec1c7bd22 100644 --- a/airbyte-integrations/bases/base-normalization/Dockerfile +++ b/airbyte-integrations/bases/base-normalization/Dockerfile @@ -16,5 +16,5 @@ WORKDIR /airbyte ENTRYPOINT ["/airbyte/entrypoint.sh"] -LABEL io.airbyte.version=0.1.0 +LABEL io.airbyte.version=0.1.1 LABEL io.airbyte.name=airbyte/normalization diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/normalization/DefaultNormalizationRunner.java b/airbyte-workers/src/main/java/io/airbyte/workers/normalization/DefaultNormalizationRunner.java index 3902cb2f05f6..bac690e5a88c 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/normalization/DefaultNormalizationRunner.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/normalization/DefaultNormalizationRunner.java @@ -43,7 +43,7 @@ public class DefaultNormalizationRunner implements NormalizationRunner { private static final Logger LOGGER = LoggerFactory.getLogger(DefaultNormalizationRunner.class); - public static final String NORMALIZATION_IMAGE_NAME = "airbyte/normalization:0.1.0"; + public static final String NORMALIZATION_IMAGE_NAME = "airbyte/normalization:0.1.1"; private final DestinationType destinationType; private final ProcessBuilderFactory pbf;