diff --git a/airbyte-config/init/src/main/resources/icons/duckdb.svg b/airbyte-config/init/src/main/resources/icons/duckdb.svg new file mode 100644 index 000000000000..ceb66d279033 --- /dev/null +++ b/airbyte-config/init/src/main/resources/icons/duckdb.svg @@ -0,0 +1,5 @@ + + + + + diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index 002404443f58..5feb6d6b217b 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -443,3 +443,15 @@ dockerImageTag: 0.1.0 documentationUrl: https://docs.airbyte.com/integrations/destinations/weaviate releaseStage: alpha +- name: DuckDB + destinationDefinitionId: 94bd199c-2ff0-4aa2-b98e-17f0acb72610 + dockerRepository: airbyte/destination-duckdb + dockerImageTag: 0.1.0 + documentationUrl: https://docs.airbyte.io/integrations/destinations/duckdb + icon: duckdb.svg + normalizationConfig: + normalizationRepository: airbyte/normalization-duckdb + normalizationTag: 0.2.25 + normalizationIntegrationType: duckdb + supportsDbt: true + releaseStage: alpha diff --git a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml index 94ddbba34d97..bafa4aed0fa0 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml @@ -7347,3 +7347,30 @@ supported_destination_sync_modes: - "append" - "overwrite" +- dockerImage: "airbyte/destination-duckdb:0.1.0" + spec: + documentationUrl: "https://docs.airbyte.io/integrations/destinations/duckdb" + connectionSpecification: + $schema: "http://json-schema.org/draft-07/schema#" + title: "DuckDB Destination Spec" + type: "object" + required: + - "destination_path" + additionalProperties: false + properties: + destination_path: + type: "string" + description: "Path to the destination.duckdb file. The file will be placed\ + \ inside that local mount. For more information check out our docs" + example: "/local/destination.duckdb" + schema: + type: "string" + description: "database schema, default for duckdb is main" + example: "main" + supportsIncremental: true + supportsNormalization: true + supportsDBT: true + supported_destination_sync_modes: + - "overwrite" + - "append" diff --git a/airbyte-integrations/bases/base-normalization/build.gradle b/airbyte-integrations/bases/base-normalization/build.gradle index a95816c2d0b9..4693b612adc3 100644 --- a/airbyte-integrations/bases/base-normalization/build.gradle +++ b/airbyte-integrations/bases/base-normalization/build.gradle @@ -81,6 +81,10 @@ task airbyteDockerTiDB(type: Exec, dependsOn: checkSshScriptCopy) { configure buildAirbyteDocker('tidb') dependsOn assemble } +task airbyteDockerDuckDB(type: Exec, dependsOn: checkSshScriptCopy) { + configure buildAirbyteDocker('duckdb') + dependsOn assemble +} airbyteDocker.dependsOn(airbyteDockerMSSql) airbyteDocker.dependsOn(airbyteDockerMySql) @@ -89,6 +93,7 @@ airbyteDocker.dependsOn(airbyteDockerClickhouse) airbyteDocker.dependsOn(airbyteDockerSnowflake) airbyteDocker.dependsOn(airbyteDockerRedshift) airbyteDocker.dependsOn(airbyteDockerTiDB) +airbyteDocker.dependsOn(airbyteDockerDuckDB) task("customIntegrationTestPython", type: PythonTask, dependsOn: installTestReqs) { module = "pytest" @@ -104,6 +109,7 @@ task("customIntegrationTestPython", type: PythonTask, dependsOn: installTestReqs dependsOn ':airbyte-integrations:connectors:destination-mssql:airbyteDocker' dependsOn ':airbyte-integrations:connectors:destination-clickhouse:airbyteDocker' dependsOn ':airbyte-integrations:connectors:destination-tidb:airbyteDocker' + dependsOn ':airbyte-integrations:connectors:destination-duckdb:airbyteDocker' } // not really sure what this task does differently from customIntegrationTestPython, but it seems to also run integration tests @@ -118,6 +124,7 @@ project.tasks.findByName('_customIntegrationTestsCoverage').dependsOn ':airbyte- project.tasks.findByName('_customIntegrationTestsCoverage').dependsOn ':airbyte-integrations:connectors:destination-mssql:airbyteDocker' project.tasks.findByName('_customIntegrationTestsCoverage').dependsOn ':airbyte-integrations:connectors:destination-clickhouse:airbyteDocker' project.tasks.findByName('_customIntegrationTestsCoverage').dependsOn ':airbyte-integrations:connectors:destination-tidb:airbyteDocker' +project.tasks.findByName('_customIntegrationTestsCoverage').dependsOn ':airbyte-integrations:connectors:destination-duckdb:airbyteDocker' // DATs have some additional tests that exercise normalization code paths, // so we want to run these in addition to the base-normalization integration tests. diff --git a/airbyte-integrations/bases/base-normalization/dbt-project-template-duckdb/dbt_project.yml b/airbyte-integrations/bases/base-normalization/dbt-project-template-duckdb/dbt_project.yml new file mode 100755 index 000000000000..7631ef356dc9 --- /dev/null +++ b/airbyte-integrations/bases/base-normalization/dbt-project-template-duckdb/dbt_project.yml @@ -0,0 +1,63 @@ +# This file is necessary to install dbt-utils with dbt deps +# the content will be overwritten by the transform function + +# Name your package! Package names should contain only lowercase characters +# and underscores. A good package name should reflect your organization's +# name or the intended use of these models +name: "airbyte_utils" +version: "1.0" +config-version: 2 + +# This setting configures which "profile" dbt uses for this project. Profiles contain +# database connection information, and should be configured in the ~/.dbt/profiles.yml file +profile: "normalize" + +# These configurations specify where dbt should look for different types of files. +# The `model-paths` config, for example, states that source models can be found +# in the "models/" directory. You probably won't need to change these! +model-paths: ["models"] +docs-paths: ["docs"] +analysis-paths: ["analysis"] +test-paths: ["tests"] +seed-paths: ["data"] +macro-paths: ["macros"] + +target-path: "../build" # directory which will store compiled SQL files +log-path: "../logs" # directory which will store DBT logs +packages-install-path: "/dbt" # directory which will store external DBT dependencies + +clean-targets: # directories to be removed by `dbt clean` + - "build" + - "dbt_modules" + +quoting: + database: true + # Temporarily disabling the behavior of the ExtendedNameTransformer on table/schema names, see (issue #1785) + # all schemas should be unquoted + schema: false + identifier: true + +# You can define configurations for models in the `model-paths` directory here. +# Using these configurations, you can enable or disable models, change how they +# are materialized, and more! +models: + airbyte_utils: + +materialized: table + generated: + airbyte_ctes: + +tags: airbyte_internal_cte + +materialized: ephemeral + airbyte_incremental: + +tags: incremental_tables + +materialized: incremental + +on_schema_change: sync_all_columns + airbyte_tables: + +tags: normalized_tables + +materialized: table + airbyte_views: + +tags: airbyte_internal_views + +materialized: view + +dispatch: + - macro_namespace: dbt_utils + search_order: ["airbyte_utils", "dbt_utils"] diff --git a/airbyte-integrations/bases/base-normalization/dbt-project-template-duckdb/packages.yml b/airbyte-integrations/bases/base-normalization/dbt-project-template-duckdb/packages.yml new file mode 100755 index 000000000000..33b4edd58c8c --- /dev/null +++ b/airbyte-integrations/bases/base-normalization/dbt-project-template-duckdb/packages.yml @@ -0,0 +1,5 @@ +# add dependencies. these will get pulled during the `dbt deps` process. + +packages: + - git: "https://github.com/fishtown-analytics/dbt-utils.git" + revision: 0.8.2 diff --git a/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/array.sql b/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/array.sql index 56ab17ce9af6..c2ed61aa061e 100644 --- a/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/array.sql +++ b/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/array.sql @@ -46,6 +46,10 @@ left join joined on _airbyte_{{ stream_name }}_hashid = joined._airbyte_hashid {%- endmacro %} +{% macro duckdb__cross_join_unnest(stream_name, array_col) -%} + left join joined on _airbyte_{{ stream_name }}_hashid = joined._airbyte_hashid +{%- endmacro %} + {% macro redshift__cross_join_unnest(stream_name, array_col) -%} left join joined on _airbyte_{{ stream_name }}_hashid = joined._airbyte_hashid {%- endmacro %} @@ -95,6 +99,10 @@ _airbyte_nested_data {%- endmacro %} +{% macro duckdb__unnested_column_value(column_col) -%} + _airbyte_nested_data +{%- endmacro %} + {% macro oracle__unnested_column_value(column_col) -%} {{ column_col }} {%- endmacro %} @@ -193,3 +201,7 @@ joined as ( {% macro tidb__unnest_cte(from_table, stream_name, column_col) -%} {{ mysql__unnest_cte(from_table, stream_name, column_col) }} {%- endmacro %} + +{% macro duckdb__unnest_cte(from_table, stream_name, column_col) -%} + {{ mysql__unnest_cte(from_table, stream_name, column_col) }} +{%- endmacro %} diff --git a/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/concat.sql b/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/concat.sql index 1148a04cca70..aab42ca3b964 100644 --- a/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/concat.sql +++ b/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/concat.sql @@ -30,3 +30,7 @@ {% macro tidb__concat(fields) -%} concat({{ fields|join(', ') }}) {%- endmacro %} + +{% macro duckdb__concat(fields) -%} + concat({{ fields|join(', ') }}) +{%- endmacro %} diff --git a/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/datatypes.sql b/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/datatypes.sql index 42f5312b054f..16015a1410e8 100755 --- a/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/datatypes.sql +++ b/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/datatypes.sql @@ -44,6 +44,9 @@ json {%- endmacro -%} +{%- macro duckdb__type_json() -%} + json +{%- endmacro -%} {# string ------------------------------------------------- #} @@ -72,6 +75,10 @@ char(1000) {%- endmacro -%} +{%- macro duckdb__type_string() -%} + VARCHAR +{%- endmacro -%} + {# float ------------------------------------------------- #} {% macro mysql__type_float() %} float @@ -89,6 +96,10 @@ float {% endmacro %} +{% macro duckdb__type_float() %} + DOUBLE +{% endmacro %} + {# int ------------------------------------------------- #} {% macro default__type_int() %} int @@ -110,6 +121,10 @@ signed {% endmacro %} +{% macro duckdb__type_int() %} + INTEGER +{% endmacro %} + {# bigint ------------------------------------------------- #} {% macro mysql__type_bigint() %} signed @@ -127,6 +142,10 @@ signed {% endmacro %} +{% macro duckdb__type_bigint() %} + BIGINT +{% endmacro %} + {# numeric ------------------------------------------------- --#} {% macro mysql__type_numeric() %} float @@ -140,6 +159,10 @@ float {% endmacro %} +{% macro duckdb__type_numeric() %} + DOUBLE +{% endmacro %} + {# very_large_integer --------------------------------------- --#} {# Most databases don't have a true unbounded numeric datatype, so we use a really big numeric field. @@ -170,6 +193,10 @@ so this macro needs to be called very_large_integer. decimal(38, 0) {% endmacro %} +{% macro duckdb__type_very_large_integer() %} + DECIMAL(38, 0) +{% endmacro %} + {# timestamp ------------------------------------------------- --#} {% macro mysql__type_timestamp() %} time @@ -189,6 +216,10 @@ so this macro needs to be called very_large_integer. time {% endmacro %} +{% macro duckdb__type_timestamp() %} + TIMESTAMP +{% endmacro %} + {# timestamp with time zone ------------------------------------------------- #} {%- macro type_timestamp_with_timezone() -%} @@ -229,6 +260,10 @@ so this macro needs to be called very_large_integer. char(1000) {%- endmacro -%} +{%- macro duckdb__type_timestamp_with_timezone() -%} + TIMESTAMPTZ +{%- endmacro -%} + {# timestamp without time zone ------------------------------------------------- #} {%- macro type_timestamp_without_timezone() -%} @@ -261,6 +296,10 @@ so this macro needs to be called very_large_integer. datetime {% endmacro %} +{% macro duckdb__type_timestamp_without_timezone() %} + TIMESTAMP +{% endmacro %} + {# time without time zone ------------------------------------------------- #} {%- macro type_time_without_timezone() -%} @@ -287,6 +326,9 @@ so this macro needs to be called very_large_integer. time {% endmacro %} +{% macro duckdb__type_time_without_timezone() %} + TIMESTAMP +{% endmacro %} {# time with time zone ------------------------------------------------- #} @@ -330,6 +372,9 @@ so this macro needs to be called very_large_integer. char(1000) {%- endmacro -%} +{%- macro duckdb__type_time_with_timezone() -%} + TIMESTAMPTZ +{%- endmacro -%} {# date ------------------------------------------------- #} {%- macro type_date() -%} diff --git a/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/json_operations.sql b/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/json_operations.sql index 0b76f5f49a29..aaeabdb5267a 100644 --- a/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/json_operations.sql +++ b/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/json_operations.sql @@ -7,6 +7,7 @@ - MySQL: JSON_EXTRACT(json_doc, 'path' [, 'path'] ...) -> https://dev.mysql.com/doc/refman/8.0/en/json-search-functions.html - ClickHouse: JSONExtractString(json_doc, 'path' [, 'path'] ...) -> https://clickhouse.com/docs/en/sql-reference/functions/json-functions/ - TiDB: JSON_EXTRACT(json_doc, 'path' [, 'path'] ...) -> https://docs.pingcap.com/tidb/stable/json-functions + - DuckDB: json_extract(json, 'path') note: If path is a LIST, the result will be a LIST of JSON -> https://duckdb.org/docs/extensions/json #} {# format_json_path -------------------------------------------------- #} @@ -103,6 +104,11 @@ {{ "'$.\"" ~ json_path_list|join(".") ~ "\"'" }} {%- endmacro %} +{% macro duckdb__format_json_path(json_path_list) -%} + {# -- '$."x"."y"."z"' #} + {{ "'$.\"" ~ json_path_list|join(".") ~ "\"'" }} +{%- endmacro %} + {# json_extract ------------------------------------------------- #} {% macro json_extract(from_table, json_column, json_path_list, normalized_json_path) -%} @@ -180,6 +186,14 @@ {% endif -%} {%- endmacro %} +{% macro duckdb__json_extract(from_table, json_column, json_path_list, normalized_json_path) -%} + {%- if from_table|string() == '' %} + json_extract({{ json_column }}, {{ format_json_path(normalized_json_path) }}) + {% else %} + json_extract({{ from_table }}.{{ json_column }}, {{ format_json_path(normalized_json_path) }}) + {% endif -%} +{%- endmacro %} + {# json_extract_scalar ------------------------------------------------- #} {% macro json_extract_scalar(json_column, json_path_list, normalized_json_path) -%} @@ -234,6 +248,10 @@ ) {%- endmacro %} +{% macro duckdb__json_extract_scalar(json_column, json_path_list, normalized_json_path) -%} + json_extract_string({{ json_column }}, {{ format_json_path(json_path_list) }}) +{%- endmacro %} + {# json_extract_array ------------------------------------------------- #} {% macro json_extract_array(json_column, json_path_list, normalized_json_path) -%} @@ -284,6 +302,10 @@ json_extract({{ json_column }}, {{ format_json_path(normalized_json_path) }}) {%- endmacro %} +{% macro duckdb__json_extract_array(json_column, json_path_list, normalized_json_path) -%} + json_extract({{ json_column }}, {{ format_json_path(normalized_json_path) }}) +{%- endmacro %} + {# json_extract_string_array ------------------------------------------------- #} {% macro json_extract_string_array(json_column, json_path_list, normalized_json_path) -%} diff --git a/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/type_conversions.sql b/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/type_conversions.sql index 82c856f7e053..f31d8552c667 100644 --- a/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/type_conversions.sql +++ b/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/type_conversions.sql @@ -77,6 +77,10 @@ IF(lower({{ field }}) = 'true', true, false) {%- endmacro %} +{% macro duckdb__cast_to_boolean(field) -%} + cast({{ field }} as boolean) +{%- endmacro %} + {# -- Redshift does not support converting string directly to boolean, it must go through int first #} {% macro redshift__cast_to_boolean(field) -%} {% if redshift_super_type() -%} @@ -105,6 +109,10 @@ nullif({{ field }}, '') {%- endmacro %} +{%- macro duckdb__empty_string_to_null(field) -%} + nullif(nullif({{ field }}, 'null'), '') +{%- endmacro %} + {%- macro redshift__empty_string_to_null(field) -%} nullif({{ field }}::varchar, '') {%- endmacro %} diff --git a/airbyte-integrations/bases/base-normalization/docker-compose.build.yaml b/airbyte-integrations/bases/base-normalization/docker-compose.build.yaml index 4f95cb7a4720..c9b9331f3e29 100644 --- a/airbyte-integrations/bases/base-normalization/docker-compose.build.yaml +++ b/airbyte-integrations/bases/base-normalization/docker-compose.build.yaml @@ -57,3 +57,10 @@ services: context: . labels: io.airbyte.git-revision: ${GIT_REVISION} + normalization-duckdb: + image: airbyte/normalization-duckdb:${VERSION} + build: + dockerfile: duckdb.Dockerfile + context: . + labels: + io.airbyte.git-revision: ${GIT_REVISION} diff --git a/airbyte-integrations/bases/base-normalization/docker-compose.yaml b/airbyte-integrations/bases/base-normalization/docker-compose.yaml index ae29237b5149..3b85f9bf0e9e 100644 --- a/airbyte-integrations/bases/base-normalization/docker-compose.yaml +++ b/airbyte-integrations/bases/base-normalization/docker-compose.yaml @@ -18,3 +18,5 @@ services: image: airbyte/normalization-redshift:${VERSION} normalization-tidb: image: airbyte/normalization-tidb:${VERSION} + normalization-duckdb: + image: airbyte/normalization-duckdb:${VERSION} diff --git a/airbyte-integrations/bases/base-normalization/duckdb.Dockerfile b/airbyte-integrations/bases/base-normalization/duckdb.Dockerfile new file mode 100644 index 000000000000..638d1e6afbeb --- /dev/null +++ b/airbyte-integrations/bases/base-normalization/duckdb.Dockerfile @@ -0,0 +1,35 @@ +FROM fishtownanalytics/dbt:1.0.0 +COPY --from=airbyte/base-airbyte-protocol-python:0.1.1 /airbyte /airbyte + +# Install SSH Tunneling dependencies +RUN apt-get update && apt-get install -y jq sshpass + +WORKDIR /airbyte +COPY entrypoint.sh . +COPY build/sshtunneling.sh . + +WORKDIR /airbyte/normalization_code +COPY normalization ./normalization +COPY setup.py . +COPY dbt-project-template/ ./dbt-template/ + +# Install python dependencies +WORKDIR /airbyte/base_python_structs +RUN pip install . + +WORKDIR /airbyte/normalization_code +RUN pip install . +RUN pip install dbt-duckdb==1.0.1 + +#adding duckdb manually (outside of setup.py - lots of errors) +RUN pip install duckdb + +WORKDIR /airbyte/normalization_code/dbt-template/ +# Download external dbt dependencies +RUN dbt deps + +WORKDIR /airbyte +ENV AIRBYTE_ENTRYPOINT "/airbyte/entrypoint.sh" +ENTRYPOINT ["/airbyte/entrypoint.sh"] + +LABEL io.airbyte.name=airbyte/normalization-duckdb diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/dbt_integration_test.py b/airbyte-integrations/bases/base-normalization/integration_tests/dbt_integration_test.py index 7cb25ea39ad9..5afeb47d57f5 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/dbt_integration_test.py +++ b/airbyte-integrations/bases/base-normalization/integration_tests/dbt_integration_test.py @@ -28,6 +28,7 @@ NORMALIZATION_TEST_POSTGRES_DB_PORT = "NORMALIZATION_TEST_POSTGRES_DB_PORT" NORMALIZATION_TEST_CLICKHOUSE_DB_PORT = "NORMALIZATION_TEST_CLICKHOUSE_DB_PORT" NORMALIZATION_TEST_TIDB_DB_PORT = "NORMALIZATION_TEST_TIDB_DB_PORT" +NORMALIZATION_TEST_DUCKDB_DESTINATION_PATH = "NORMALIZATION_TEST_DUCKDB_DESTINATION_PATH" class DbtIntegrationTest(object): @@ -58,6 +59,8 @@ def setup_db(self, destinations_to_test: List[str]): self.setup_clickhouse_db() if DestinationType.TIDB.value in destinations_to_test: self.setup_tidb_db() + if DestinationType.DUCKDB.value in destinations_to_test: + self.setup_duckdb_db() def setup_postgres_db(self): start_db = True diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/duckdb/test_nested_streams/dbt_project.yml b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/duckdb/test_nested_streams/dbt_project.yml new file mode 100755 index 000000000000..7631ef356dc9 --- /dev/null +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/duckdb/test_nested_streams/dbt_project.yml @@ -0,0 +1,63 @@ +# This file is necessary to install dbt-utils with dbt deps +# the content will be overwritten by the transform function + +# Name your package! Package names should contain only lowercase characters +# and underscores. A good package name should reflect your organization's +# name or the intended use of these models +name: "airbyte_utils" +version: "1.0" +config-version: 2 + +# This setting configures which "profile" dbt uses for this project. Profiles contain +# database connection information, and should be configured in the ~/.dbt/profiles.yml file +profile: "normalize" + +# These configurations specify where dbt should look for different types of files. +# The `model-paths` config, for example, states that source models can be found +# in the "models/" directory. You probably won't need to change these! +model-paths: ["models"] +docs-paths: ["docs"] +analysis-paths: ["analysis"] +test-paths: ["tests"] +seed-paths: ["data"] +macro-paths: ["macros"] + +target-path: "../build" # directory which will store compiled SQL files +log-path: "../logs" # directory which will store DBT logs +packages-install-path: "/dbt" # directory which will store external DBT dependencies + +clean-targets: # directories to be removed by `dbt clean` + - "build" + - "dbt_modules" + +quoting: + database: true + # Temporarily disabling the behavior of the ExtendedNameTransformer on table/schema names, see (issue #1785) + # all schemas should be unquoted + schema: false + identifier: true + +# You can define configurations for models in the `model-paths` directory here. +# Using these configurations, you can enable or disable models, change how they +# are materialized, and more! +models: + airbyte_utils: + +materialized: table + generated: + airbyte_ctes: + +tags: airbyte_internal_cte + +materialized: ephemeral + airbyte_incremental: + +tags: incremental_tables + +materialized: incremental + +on_schema_change: sync_all_columns + airbyte_tables: + +tags: normalized_tables + +materialized: table + airbyte_views: + +tags: airbyte_internal_views + +materialized: view + +dispatch: + - macro_namespace: dbt_utils + search_order: ["airbyte_utils", "dbt_utils"] diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/duckdb/test_simple_streams/dbt_project.yml b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/duckdb/test_simple_streams/dbt_project.yml new file mode 100755 index 000000000000..7631ef356dc9 --- /dev/null +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/duckdb/test_simple_streams/dbt_project.yml @@ -0,0 +1,63 @@ +# This file is necessary to install dbt-utils with dbt deps +# the content will be overwritten by the transform function + +# Name your package! Package names should contain only lowercase characters +# and underscores. A good package name should reflect your organization's +# name or the intended use of these models +name: "airbyte_utils" +version: "1.0" +config-version: 2 + +# This setting configures which "profile" dbt uses for this project. Profiles contain +# database connection information, and should be configured in the ~/.dbt/profiles.yml file +profile: "normalize" + +# These configurations specify where dbt should look for different types of files. +# The `model-paths` config, for example, states that source models can be found +# in the "models/" directory. You probably won't need to change these! +model-paths: ["models"] +docs-paths: ["docs"] +analysis-paths: ["analysis"] +test-paths: ["tests"] +seed-paths: ["data"] +macro-paths: ["macros"] + +target-path: "../build" # directory which will store compiled SQL files +log-path: "../logs" # directory which will store DBT logs +packages-install-path: "/dbt" # directory which will store external DBT dependencies + +clean-targets: # directories to be removed by `dbt clean` + - "build" + - "dbt_modules" + +quoting: + database: true + # Temporarily disabling the behavior of the ExtendedNameTransformer on table/schema names, see (issue #1785) + # all schemas should be unquoted + schema: false + identifier: true + +# You can define configurations for models in the `model-paths` directory here. +# Using these configurations, you can enable or disable models, change how they +# are materialized, and more! +models: + airbyte_utils: + +materialized: table + generated: + airbyte_ctes: + +tags: airbyte_internal_cte + +materialized: ephemeral + airbyte_incremental: + +tags: incremental_tables + +materialized: incremental + +on_schema_change: sync_all_columns + airbyte_tables: + +tags: normalized_tables + +materialized: table + airbyte_views: + +tags: airbyte_internal_views + +materialized: view + +dispatch: + - macro_namespace: dbt_utils + search_order: ["airbyte_utils", "dbt_utils"] diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/data_input/replace_identifiers.json b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/data_input/replace_identifiers.json index 60e8134f07b0..0c2197f2d759 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/data_input/replace_identifiers.json +++ b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_nested_streams/data_input/replace_identifiers.json @@ -106,5 +106,25 @@ { "\\\"column`_'with\\\"\\\"_quotes\\\" is not null": "coalesce(json_length(`column__'with\\\"_quotes`), 0) > 0" } + ], + "duckdb": [ + { + "_airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names": "_airbyte_raw_nested_s__lting_into_long_names" + }, + { + "nested_stream_with_complex_columns_resulting_into_long_names_partition_double_array_data": "nested_stream_with_co__ion_double_array_data" + }, + { + "nested_stream_with_complex_columns_resulting_into_long_names_partition_data": "nested_stream_with_co___names_partition_data" + }, + { + "nested_stream_with_complex_columns_resulting_into_long_names_partition": "nested_stream_with_co___long_names_partition" + }, + { + "'nested_stream_with_complex_columns_resulting_into_long_names'": "'nested_stream_with_co__lting_into_long_names'" + }, + { + "non_nested_stream_without_namespace_resulting_into_long_names": "non_nested_stream_wit__lting_into_long_names" + } ] } diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/test_normalization.py b/airbyte-integrations/bases/base-normalization/integration_tests/test_normalization.py index 0163cd128151..7032ba6cd21b 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/test_normalization.py +++ b/airbyte-integrations/bases/base-normalization/integration_tests/test_normalization.py @@ -140,7 +140,12 @@ def run_schema_change_normalization(destination_type: DestinationType, test_reso if destination_type.value in [DestinationType.MYSQL.value, DestinationType.ORACLE.value]: # TODO: upgrade dbt-adapter repositories to work with dbt 0.21.0+ (outside airbyte's control) pytest.skip(f"{destination_type} does not support schema change in incremental yet (requires dbt 0.21.0+)") - if destination_type.value in [DestinationType.SNOWFLAKE.value, DestinationType.CLICKHOUSE.value, DestinationType.TIDB.value]: + if destination_type.value in [ + DestinationType.SNOWFLAKE.value, + DestinationType.CLICKHOUSE.value, + DestinationType.TIDB.value, + DestinationType.DUCKDB.value, + ]: pytest.skip(f"{destination_type} is disabled as it doesnt support schema change in incremental yet (column type changes)") if destination_type.value in [DestinationType.MSSQL.value, DestinationType.SNOWFLAKE.value]: # TODO: create/fix github issue in corresponding dbt-adapter repository to handle schema changes (outside airbyte's control) @@ -213,6 +218,9 @@ def setup_test_dir(destination_type: DestinationType, test_resource_name: str) - elif destination_type.value == DestinationType.TIDB.value: copy_tree("../dbt-project-template-tidb", test_root_dir) dbt_project_yaml = "../dbt-project-template-tidb/dbt_project.yml" + elif destination_type.value == DestinationType.DUCKDB.value: + copy_tree("../dbt-project-template-duckdb", test_root_dir) + dbt_project_yaml = "../dbt-project-template-duckdb/dbt_project.yml" dbt_test_utils.copy_replace(dbt_project_yaml, os.path.join(test_root_dir, "dbt_project.yml")) return test_root_dir diff --git a/airbyte-integrations/bases/base-normalization/normalization/destination_type.py b/airbyte-integrations/bases/base-normalization/normalization/destination_type.py index 3f1d154f52ce..36b2ea371137 100644 --- a/airbyte-integrations/bases/base-normalization/normalization/destination_type.py +++ b/airbyte-integrations/bases/base-normalization/normalization/destination_type.py @@ -16,6 +16,7 @@ class DestinationType(Enum): REDSHIFT = "redshift" SNOWFLAKE = "snowflake" TIDB = "tidb" + DUCKDB = "duckdb" @classmethod def from_string(cls, string_value: str) -> "DestinationType": diff --git a/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/catalog_processor.py b/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/catalog_processor.py index 2cbfe09394e4..5f55bf437c02 100644 --- a/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/catalog_processor.py +++ b/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/catalog_processor.py @@ -69,7 +69,11 @@ def process(self, catalog_file: str, json_column_name: str, default_schema: str) ) for stream_processor in stream_processors: # MySQL table names need to be manually truncated, because it does not do it automatically - truncate = self.destination_type == DestinationType.MYSQL or self.destination_type == DestinationType.TIDB + truncate = ( + self.destination_type == DestinationType.MYSQL + or self.destination_type == DestinationType.TIDB + or self.destination_type == DestinationType.DUCKDB + ) raw_table_name = self.name_transformer.normalize_table_name(f"_airbyte_raw_{stream_processor.stream_name}", truncate=truncate) add_table_to_sources(schema_to_source_tables, stream_processor.schema, raw_table_name) @@ -116,7 +120,11 @@ def build_stream_processor( stream_name = get_field(stream_config, "name", f"Invalid Stream: 'name' is not defined in stream: {str(stream_config)}") # MySQL table names need to be manually truncated, because it does not do it automatically - truncate = destination_type == DestinationType.MYSQL or destination_type == DestinationType.TIDB + truncate = ( + destination_type == DestinationType.MYSQL + or destination_type == DestinationType.TIDB + or destination_type == DestinationType.DUCKDB + ) raw_table_name = name_transformer.normalize_table_name(f"_airbyte_raw_{stream_name}", truncate=truncate) source_sync_mode = get_source_sync_mode(configured_stream, stream_name) diff --git a/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/destination_name_transformer.py b/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/destination_name_transformer.py index b65c5545e56e..d1b6d95a6c27 100644 --- a/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/destination_name_transformer.py +++ b/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/destination_name_transformer.py @@ -29,6 +29,9 @@ DestinationType.CLICKHOUSE.value: 63, # https://docs.pingcap.com/tidb/stable/tidb-limitations DestinationType.TIDB.value: 64, + # According to the DuckDB team there no restriction: We don't enforce a maximum right now but I would not recommend having column names + # longer than a few kilobytes. https://discord.com/channels/909674491309850675/1067042662827438122/1067043835768737893. + DestinationType.DUCKDB.value: 64, } # DBT also needs to generate suffix to table names, so we need to make sure it has enough characters to do so... @@ -170,7 +173,11 @@ def __normalize_identifier_name( result = result.replace('"', "_") result = result.replace("`", "_") result = result.replace("'", "_") - elif self.destination_type.value != DestinationType.MYSQL.value and self.destination_type.value != DestinationType.TIDB.value: + elif ( + self.destination_type.value != DestinationType.MYSQL.value + and self.destination_type.value != DestinationType.TIDB.value + and self.destination_type.value != DestinationType.DUCKDB.value + ): result = result.replace('"', '""') else: result = result.replace("`", "_") @@ -239,6 +246,9 @@ def __normalize_identifier_case(self, input_name: str, is_quoted: bool = False) elif self.destination_type.value == DestinationType.TIDB.value: if not is_quoted and not self.needs_quotes(input_name): result = input_name.lower() + elif self.destination_type.value == DestinationType.DUCKDB.value: + if not is_quoted and not self.needs_quotes(input_name): + result = input_name.lower() else: raise KeyError(f"Unknown destination type {self.destination_type}") return result @@ -279,6 +289,8 @@ def normalize_column_identifier_case_for_lookup(self, input_name: str, is_quoted pass elif self.destination_type.value == DestinationType.TIDB.value: result = input_name.lower() + elif self.destination_type.value == DestinationType.DUCKDB.value: + result = input_name.lower() else: raise KeyError(f"Unknown destination type {self.destination_type}") return result diff --git a/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/reserved_keywords.py b/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/reserved_keywords.py index 0931b4f29c29..1f20d317156c 100644 --- a/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/reserved_keywords.py +++ b/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/reserved_keywords.py @@ -3111,6 +3111,150 @@ "ZEROFILL", } +# DuckDB uses Sqlite interface: https://www.sqlite.org/lang_keywords.html +DUCKDB = { + "ANALYZE", + "AND", + "AS", + "ASC", + "ATTACH", + "AUTOINCREMENT", + "BEFORE", + "BEGIN", + "BETWEEN", + "BY", + "CASCADE", + "CASE", + "CAST", + "CHECK", + "COLLATE", + "COLUMN", + "COMMIT", + "CONFLICT", + "CONSTRAINT", + "CREATE", + "CROSS", + "CURRENT", + "CURRENT_DATE", + "CURRENT_TIME", + "CURRENT_TIMESTAMP", + "DATABASE", + "DEFAULT", + "DEFERRABLE", + "DEFERRED", + "DELETE", + "DESC", + "DETACH", + "DISTINCT", + "DO", + "DROP", + "EACH", + "ELSE", + "END", + "ESCAPE", + "EXCEPT", + "EXCLUDE", + "EXCLUSIVE", + "EXISTS", + "EXPLAIN", + "FAIL", + "FILTER", + "FIRST", + "FOLLOWING", + "FOR", + "FOREIGN", + "FROM", + "FULL", + "GENERATED", + "GLOB", + "GROUP", + "GROUPS", + "HAVING", + "IF", + "IGNORE", + "IMMEDIATE", + "IN", + "INDEX", + "INDEXED", + "INITIALLY", + "INNER", + "INSERT", + "INSTEAD", + "INTERSECT", + "INTO", + "IS", + "ISNULL", + "JOIN", + "KEY", + "LAST", + "LEFT", + "LIKE", + "LIMIT", + "MATCH", + "MATERIALIZED", + "NATURAL", + "NO", + "NOT", + "NOTHING", + "NOTNULL", + "NULL", + "NULLS", + "OF", + "OFFSET", + "ON", + "OR", + "ORDER", + "OTHERS", + "OUTER", + "OVER", + "PARTITION", + "PLAN", + "PRAGMA", + "PRECEDING", + "PRIMARY", + "QUERY", + "RAISE", + "RANGE", + "RECURSIVE", + "REFERENCES", + "REGEXP", + "REINDEX", + "RELEASE", + "RENAME", + "REPLACE", + "RESTRICT", + "RETURNING", + "RIGHT", + "ROLLBACK", + "ROW", + "ROWS", + "SAVEPOINT", + "SELECT", + "SET", + "TABLE", + "TEMP", + "TEMPORARY", + "THEN", + "TIES", + "TO", + "TRANSACTION", + "TRIGGER", + "UNBOUNDED", + "UNION", + "UNIQUE", + "UPDATE", + "USING", + "VACUUM", + "VALUES", + "VIEW", + "VIRTUAL", + "WHEN", + "WHERE", + "WINDOW", + "WITH", + "WITHOUT", +} + RESERVED_KEYWORDS = { DestinationType.BIGQUERY.value: BIGQUERY, DestinationType.POSTGRES.value: POSTGRES, @@ -3121,6 +3265,7 @@ DestinationType.MSSQL.value: MSSQL, DestinationType.CLICKHOUSE.value: CLICKHOUSE, DestinationType.TIDB.value: TIDB, + DestinationType.DUCKDB.value: DUCKDB, } diff --git a/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py b/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py index 231588f92903..2bcf74fa12ba 100644 --- a/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py +++ b/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py @@ -548,7 +548,11 @@ def cast_property_type(self, property_name: str, column_name: str, jinja_column: sql_type = jinja_call("type_timestamp_with_timezone()") return f"cast({replace_operation} as {sql_type}) as {column_name}" elif is_date(definition): - if self.destination_type.value == DestinationType.MYSQL.value or self.destination_type.value == DestinationType.TIDB.value: + if ( + self.destination_type.value == DestinationType.MYSQL.value + or self.destination_type.value == DestinationType.TIDB.value + or self.destination_type.value == DestinationType.DUCKDB.value + ): # MySQL does not support [cast] and [nullif] functions together return self.generate_mysql_date_format_statement(column_name) replace_operation = jinja_call(f"empty_string_to_null({jinja_column})") @@ -570,7 +574,11 @@ def cast_property_type(self, property_name: str, column_name: str, jinja_column: trimmed_column_name = f"trim(BOTH '\"' from {column_name})" sql_type = f"'{sql_type}'" return f"nullif(accurateCastOrNull({trimmed_column_name}, {sql_type}), 'null') as {column_name}" - if self.destination_type == DestinationType.MYSQL or self.destination_type == DestinationType.TIDB: + if ( + self.destination_type == DestinationType.MYSQL + or self.destination_type == DestinationType.TIDB + or self.destination_type == DestinationType.DUCKDB + ): return f'nullif(cast({column_name} as {sql_type}), "") as {column_name}' replace_operation = jinja_call(f"empty_string_to_null({jinja_column})") return f"cast({replace_operation} as {sql_type}) as {column_name}" @@ -1146,7 +1154,11 @@ def wrap_in_quotes(s: str) -> str: schema = self.get_schema(is_intermediate) # MySQL table names need to be manually truncated, because it does not do it automatically - truncate_name = self.destination_type == DestinationType.MYSQL or self.destination_type == DestinationType.TIDB + truncate_name = ( + self.destination_type == DestinationType.MYSQL + or self.destination_type == DestinationType.TIDB + or self.destination_type == DestinationType.DUCKDB + ) table_name = self.tables_registry.get_table_name(schema, self.json_path, self.stream_name, suffix, truncate_name) file_name = self.tables_registry.get_file_name(schema, self.json_path, self.stream_name, suffix, truncate_name) file = f"{file_name}.sql" diff --git a/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/transform.py b/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/transform.py index f32f290de6a9..005b85b276d6 100644 --- a/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/transform.py +++ b/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/transform.py @@ -100,5 +100,12 @@ def extract_schema(profiles_yml: Dict) -> str: raise KeyError("No Dataset/Schema defined in profiles.yml") +def extract_path(profiles_yml: Dict) -> str: + if "path" in profiles_yml: + return str(profiles_yml["path"]) + else: + raise KeyError("No destination_path defined in profiles.yml") + + def main(args=None): TransformCatalog().run(args) diff --git a/airbyte-integrations/bases/base-normalization/normalization/transform_config/transform.py b/airbyte-integrations/bases/base-normalization/normalization/transform_config/transform.py index a762b39f1a45..7f8e2be3a9a3 100644 --- a/airbyte-integrations/bases/base-normalization/normalization/transform_config/transform.py +++ b/airbyte-integrations/bases/base-normalization/normalization/transform_config/transform.py @@ -59,6 +59,7 @@ def transform(self, integration_type: DestinationType, config: Dict[str, Any]): DestinationType.MSSQL.value: self.transform_mssql, DestinationType.CLICKHOUSE.value: self.transform_clickhouse, DestinationType.TIDB.value: self.transform_tidb, + DestinationType.DUCKDB.value: self.transform_duckdb, }[integration_type.value](config) # merge pre-populated base_profile with destination-specific configuration. @@ -345,6 +346,16 @@ def transform_tidb(config: Dict[str, Any]): } return dbt_config + @staticmethod + def transform_duckdb(config: Dict[str, Any]): + print("transform_duckdb") + dbt_config = { + "type": "duckdb", + "path": config["destination_path"], + "schema": config["schema"] if "schema" in config else "main", + } + return dbt_config + @staticmethod def read_json_config(input_path: str): with open(input_path, "r") as file: diff --git a/airbyte-integrations/bases/base-normalization/unit_tests/resources/long_name_truncate_collisions_catalog_expected_duckdb_names.json b/airbyte-integrations/bases/base-normalization/unit_tests/resources/long_name_truncate_collisions_catalog_expected_duckdb_names.json new file mode 100644 index 000000000000..160fc5b70b75 --- /dev/null +++ b/airbyte-integrations/bases/base-normalization/unit_tests/resources/long_name_truncate_collisions_catalog_expected_duckdb_names.json @@ -0,0 +1,32 @@ +{ + "_airbyte_another.postgres_has_a_64_characters_limit_to_table_names_but_other_destinations_are_fine.postgres_has_a_64_characters_limit_to_table_names_but_other_destinations_are_fine": { + "file": "postgres_has_a_64_cha__destinations_are_fine", + "schema": "_airbyte_another", + "table": "postgres_has_a_64_cha__destinations_are_fine" + }, + "_airbyte_schema_test.postgres_has_a_64_characters_and_not_more_limit_to_table_names_but_other_destinations_are_fine.postgres_has_a_64_characters_and_not_more_limit_to_table_names_but_other_destinations_are_fine": { + "file": "postgres_has_a_64_cha__inations_are_fine_d2b", + "schema": "_airbyte_schema_test", + "table": "postgres_has_a_64_cha__inations_are_fine_d2b" + }, + "_airbyte_schema_test.postgres_has_a_64_characters_limit_to_table_names_but_other_destinations_are_fine.postgres_has_a_64_characters_limit_to_table_names_but_other_destinations_are_fine": { + "file": "postgres_has_a_64_cha__inations_are_fine_e5a", + "schema": "_airbyte_schema_test", + "table": "postgres_has_a_64_cha__inations_are_fine_e5a" + }, + "another.postgres_has_a_64_characters_limit_to_table_names_but_other_destinations_are_fine.postgres_has_a_64_characters_limit_to_table_names_but_other_destinations_are_fine": { + "file": "postgres_has_a_64_cha__destinations_are_fine", + "schema": "another", + "table": "postgres_has_a_64_cha__destinations_are_fine" + }, + "schema_test.postgres_has_a_64_characters_and_not_more_limit_to_table_names_but_other_destinations_are_fine.postgres_has_a_64_characters_and_not_more_limit_to_table_names_but_other_destinations_are_fine": { + "file": "postgres_has_a_64_cha__inations_are_fine_d2b", + "schema": "schema_test", + "table": "postgres_has_a_64_cha__inations_are_fine_d2b" + }, + "schema_test.postgres_has_a_64_characters_limit_to_table_names_but_other_destinations_are_fine.postgres_has_a_64_characters_limit_to_table_names_but_other_destinations_are_fine": { + "file": "postgres_has_a_64_cha__inations_are_fine_e5a", + "schema": "schema_test", + "table": "postgres_has_a_64_cha__inations_are_fine_e5a" + } +} diff --git a/airbyte-integrations/bases/base-normalization/unit_tests/resources/nested_catalog_expected_duckdb_names.json b/airbyte-integrations/bases/base-normalization/unit_tests/resources/nested_catalog_expected_duckdb_names.json new file mode 100644 index 000000000000..2bbb864cc4d8 --- /dev/null +++ b/airbyte-integrations/bases/base-normalization/unit_tests/resources/nested_catalog_expected_duckdb_names.json @@ -0,0 +1,252 @@ +{ + "_airbyte_schema_test.adcreatives.adcreatives": { + "file": "adcreatives", + "schema": "_airbyte_schema_test", + "table": "adcreatives" + }, + "_airbyte_schema_test.adcreatives_adlabels.adlabels": { + "file": "adcreatives_adlabels", + "schema": "_airbyte_schema_test", + "table": "adcreatives_adlabels" + }, + "_airbyte_schema_test.adcreatives_image_crops.image_crops": { + "file": "adcreatives_image_crops", + "schema": "_airbyte_schema_test", + "table": "adcreatives_image_crops" + }, + "_airbyte_schema_test.adcreatives_image_crops_100x100.100x100": { + "file": "adcreatives_image_crops_100x100", + "schema": "_airbyte_schema_test", + "table": "adcreatives_image_crops_100x100" + }, + "_airbyte_schema_test.adcreatives_image_crops_100x72.100x72": { + "file": "adcreatives_image_crops_100x72", + "schema": "_airbyte_schema_test", + "table": "adcreatives_image_crops_100x72" + }, + "_airbyte_schema_test.adcreatives_image_crops_191x100.191x100": { + "file": "adcreatives_image_crops_191x100", + "schema": "_airbyte_schema_test", + "table": "adcreatives_image_crops_191x100" + }, + "_airbyte_schema_test.adcreatives_image_crops_400x150.400x150": { + "file": "adcreatives_image_crops_400x150", + "schema": "_airbyte_schema_test", + "table": "adcreatives_image_crops_400x150" + }, + "_airbyte_schema_test.adcreatives_image_crops_400x500.400x500": { + "file": "adcreatives_image_crops_400x500", + "schema": "_airbyte_schema_test", + "table": "adcreatives_image_crops_400x500" + }, + "_airbyte_schema_test.adcreatives_image_crops_600x360.600x360": { + "file": "adcreatives_image_crops_600x360", + "schema": "_airbyte_schema_test", + "table": "adcreatives_image_crops_600x360" + }, + "_airbyte_schema_test.adcreatives_image_crops_90x160.90x160": { + "file": "adcreatives_image_crops_90x160", + "schema": "_airbyte_schema_test", + "table": "adcreatives_image_crops_90x160" + }, + "_airbyte_schema_test.adcreatives_object_story_spec.object_story_spec": { + "file": "adcreatives_object_story_spec", + "schema": "_airbyte_schema_test", + "table": "adcreatives_object_story_spec" + }, + "_airbyte_schema_test.adcreatives_object_story_spec_link_data.link_data": { + "file": "adcreatives_object_story_spec_link_data", + "schema": "_airbyte_schema_test", + "table": "adcreatives_object_story_spec_link_data" + }, + "_airbyte_schema_test.adcreatives_object_story_spec_link_data_app_link_spec.app_link_spec": { + "file": "adcreatives_object_st__nk_data_app_link_spec", + "schema": "_airbyte_schema_test", + "table": "adcreatives_object_st__nk_data_app_link_spec" + }, + "_airbyte_schema_test.adcreatives_object_story_spec_link_data_app_link_spec_android.android": { + "file": "adcreatives_object_st__app_link_spec_android", + "schema": "_airbyte_schema_test", + "table": "adcreatives_object_st__app_link_spec_android" + }, + "_airbyte_schema_test.adcreatives_object_story_spec_link_data_app_link_spec_ios.ios": { + "file": "adcreatives_object_st__ata_app_link_spec_ios", + "schema": "_airbyte_schema_test", + "table": "adcreatives_object_st__ata_app_link_spec_ios" + }, + "_airbyte_schema_test.adcreatives_object_story_spec_link_data_app_link_spec_ipad.ipad": { + "file": "adcreatives_object_st__ta_app_link_spec_ipad", + "schema": "_airbyte_schema_test", + "table": "adcreatives_object_st__ta_app_link_spec_ipad" + }, + "_airbyte_schema_test.adcreatives_object_story_spec_link_data_app_link_spec_iphone.iphone": { + "file": "adcreatives_object_st___app_link_spec_iphone", + "schema": "_airbyte_schema_test", + "table": "adcreatives_object_st___app_link_spec_iphone" + }, + "_airbyte_schema_test.adcreatives_object_story_spec_link_data_image_crops.image_crops": { + "file": "adcreatives_object_st__link_data_image_crops", + "schema": "_airbyte_schema_test", + "table": "adcreatives_object_st__link_data_image_crops" + }, + "_airbyte_schema_test.adcreatives_object_story_spec_link_data_image_crops_100x100.100x100": { + "file": "adcreatives_object_st__a_image_crops_100x100", + "schema": "_airbyte_schema_test", + "table": "adcreatives_object_st__a_image_crops_100x100" + }, + "_airbyte_schema_test.adcreatives_object_story_spec_link_data_image_crops_100x72.100x72": { + "file": "adcreatives_object_st__ta_image_crops_100x72", + "schema": "_airbyte_schema_test", + "table": "adcreatives_object_st__ta_image_crops_100x72" + }, + "_airbyte_schema_test.adcreatives_object_story_spec_link_data_image_crops_191x100.191x100": { + "file": "adcreatives_object_st__a_image_crops_191x100", + "schema": "_airbyte_schema_test", + "table": "adcreatives_object_st__a_image_crops_191x100" + }, + "_airbyte_schema_test.adcreatives_object_story_spec_link_data_image_crops_400x150.400x150": { + "file": "adcreatives_object_st__a_image_crops_400x150", + "schema": "_airbyte_schema_test", + "table": "adcreatives_object_st__a_image_crops_400x150" + }, + "_airbyte_schema_test.adcreatives_object_story_spec_link_data_image_crops_400x500.400x500": { + "file": "adcreatives_object_st__a_image_crops_400x500", + "schema": "_airbyte_schema_test", + "table": "adcreatives_object_st__a_image_crops_400x500" + }, + "_airbyte_schema_test.adcreatives_object_story_spec_link_data_image_crops_600x360.600x360": { + "file": "adcreatives_object_st__a_image_crops_600x360", + "schema": "_airbyte_schema_test", + "table": "adcreatives_object_st__a_image_crops_600x360" + }, + "_airbyte_schema_test.adcreatives_object_story_spec_link_data_image_crops_90x160.90x160": { + "file": "adcreatives_object_st__ta_image_crops_90x160", + "schema": "_airbyte_schema_test", + "table": "adcreatives_object_st__ta_image_crops_90x160" + }, + "schema_test.adcreatives.adcreatives": { + "file": "adcreatives", + "schema": "schema_test", + "table": "adcreatives" + }, + "schema_test.adcreatives_adlabels.adlabels": { + "file": "adcreatives_adlabels", + "schema": "schema_test", + "table": "adcreatives_adlabels" + }, + "schema_test.adcreatives_image_crops.image_crops": { + "file": "adcreatives_image_crops", + "schema": "schema_test", + "table": "adcreatives_image_crops" + }, + "schema_test.adcreatives_image_crops_100x100.100x100": { + "file": "adcreatives_image_crops_100x100", + "schema": "schema_test", + "table": "adcreatives_image_crops_100x100" + }, + "schema_test.adcreatives_image_crops_100x72.100x72": { + "file": "adcreatives_image_crops_100x72", + "schema": "schema_test", + "table": "adcreatives_image_crops_100x72" + }, + "schema_test.adcreatives_image_crops_191x100.191x100": { + "file": "adcreatives_image_crops_191x100", + "schema": "schema_test", + "table": "adcreatives_image_crops_191x100" + }, + "schema_test.adcreatives_image_crops_400x150.400x150": { + "file": "adcreatives_image_crops_400x150", + "schema": "schema_test", + "table": "adcreatives_image_crops_400x150" + }, + "schema_test.adcreatives_image_crops_400x500.400x500": { + "file": "adcreatives_image_crops_400x500", + "schema": "schema_test", + "table": "adcreatives_image_crops_400x500" + }, + "schema_test.adcreatives_image_crops_600x360.600x360": { + "file": "adcreatives_image_crops_600x360", + "schema": "schema_test", + "table": "adcreatives_image_crops_600x360" + }, + "schema_test.adcreatives_image_crops_90x160.90x160": { + "file": "adcreatives_image_crops_90x160", + "schema": "schema_test", + "table": "adcreatives_image_crops_90x160" + }, + "schema_test.adcreatives_object_story_spec.object_story_spec": { + "file": "adcreatives_object_story_spec", + "schema": "schema_test", + "table": "adcreatives_object_story_spec" + }, + "schema_test.adcreatives_object_story_spec_link_data.link_data": { + "file": "adcreatives_object_story_spec_link_data", + "schema": "schema_test", + "table": "adcreatives_object_story_spec_link_data" + }, + "schema_test.adcreatives_object_story_spec_link_data_app_link_spec.app_link_spec": { + "file": "adcreatives_object_st__nk_data_app_link_spec", + "schema": "schema_test", + "table": "adcreatives_object_st__nk_data_app_link_spec" + }, + "schema_test.adcreatives_object_story_spec_link_data_app_link_spec_android.android": { + "file": "adcreatives_object_st__app_link_spec_android", + "schema": "schema_test", + "table": "adcreatives_object_st__app_link_spec_android" + }, + "schema_test.adcreatives_object_story_spec_link_data_app_link_spec_ios.ios": { + "file": "adcreatives_object_st__ata_app_link_spec_ios", + "schema": "schema_test", + "table": "adcreatives_object_st__ata_app_link_spec_ios" + }, + "schema_test.adcreatives_object_story_spec_link_data_app_link_spec_ipad.ipad": { + "file": "adcreatives_object_st__ta_app_link_spec_ipad", + "schema": "schema_test", + "table": "adcreatives_object_st__ta_app_link_spec_ipad" + }, + "schema_test.adcreatives_object_story_spec_link_data_app_link_spec_iphone.iphone": { + "file": "adcreatives_object_st___app_link_spec_iphone", + "schema": "schema_test", + "table": "adcreatives_object_st___app_link_spec_iphone" + }, + "schema_test.adcreatives_object_story_spec_link_data_image_crops.image_crops": { + "file": "adcreatives_object_st__link_data_image_crops", + "schema": "schema_test", + "table": "adcreatives_object_st__link_data_image_crops" + }, + "schema_test.adcreatives_object_story_spec_link_data_image_crops_100x100.100x100": { + "file": "adcreatives_object_st__a_image_crops_100x100", + "schema": "schema_test", + "table": "adcreatives_object_st__a_image_crops_100x100" + }, + "schema_test.adcreatives_object_story_spec_link_data_image_crops_100x72.100x72": { + "file": "adcreatives_object_st__ta_image_crops_100x72", + "schema": "schema_test", + "table": "adcreatives_object_st__ta_image_crops_100x72" + }, + "schema_test.adcreatives_object_story_spec_link_data_image_crops_191x100.191x100": { + "file": "adcreatives_object_st__a_image_crops_191x100", + "schema": "schema_test", + "table": "adcreatives_object_st__a_image_crops_191x100" + }, + "schema_test.adcreatives_object_story_spec_link_data_image_crops_400x150.400x150": { + "file": "adcreatives_object_st__a_image_crops_400x150", + "schema": "schema_test", + "table": "adcreatives_object_st__a_image_crops_400x150" + }, + "schema_test.adcreatives_object_story_spec_link_data_image_crops_400x500.400x500": { + "file": "adcreatives_object_st__a_image_crops_400x500", + "schema": "schema_test", + "table": "adcreatives_object_st__a_image_crops_400x500" + }, + "schema_test.adcreatives_object_story_spec_link_data_image_crops_600x360.600x360": { + "file": "adcreatives_object_st__a_image_crops_600x360", + "schema": "schema_test", + "table": "adcreatives_object_st__a_image_crops_600x360" + }, + "schema_test.adcreatives_object_story_spec_link_data_image_crops_90x160.90x160": { + "file": "adcreatives_object_st__ta_image_crops_90x160", + "schema": "schema_test", + "table": "adcreatives_object_st__ta_image_crops_90x160" + } +} diff --git a/airbyte-integrations/bases/base-normalization/unit_tests/resources/un-nesting_collisions_catalog_expected_duckdb_names.json b/airbyte-integrations/bases/base-normalization/unit_tests/resources/un-nesting_collisions_catalog_expected_duckdb_names.json new file mode 100644 index 000000000000..0ae55a762fd8 --- /dev/null +++ b/airbyte-integrations/bases/base-normalization/unit_tests/resources/un-nesting_collisions_catalog_expected_duckdb_names.json @@ -0,0 +1,52 @@ +{ + "_airbyte_namespace.simple stream name.simple stream name": { + "file": "simple_stream_name_f35", + "schema": "_airbyte_namespace", + "table": "simple_stream_name_f35" + }, + "namespace.simple stream name.simple stream name": { + "file": "simple_stream_name_f35", + "schema": "namespace", + "table": "simple_stream_name_f35" + }, + "_airbyte_namespace.simple_stream_name.stream_name": { + "file": "_airbyte_namespace_simple_b94_stream_name", + "schema": "_airbyte_namespace", + "table": "simple_b94_stream_name" + }, + "namespace.simple_stream_name.stream_name": { + "file": "namespace_simple_b94_stream_name", + "schema": "namespace", + "table": "simple_b94_stream_name" + }, + "_airbyte_namespace.simple.simple": { + "file": "simple", + "schema": "_airbyte_namespace", + "table": "simple" + }, + "namespace.simple.simple": { + "file": "simple", + "schema": "namespace", + "table": "simple" + }, + "_airbyte_other_namespace.simple_b94_stream_name.simple_b94_stream_name": { + "file": "_airbyte_other_namesp__e_b94_stream_name_f9d", + "schema": "_airbyte_other_namespace", + "table": "simple_b94_stream_name" + }, + "other_namespace.simple_b94_stream_name.simple_b94_stream_name": { + "file": "other_namespace_simple_b94_stream_name", + "schema": "other_namespace", + "table": "simple_b94_stream_name" + }, + "_airbyte_yet_another_namespace_with_a_very_long_name.simple_b94_stream_name.simple_b94_stream_name": { + "file": "_airbyte_yet_another___e_b94_stream_name_bae", + "schema": "_airbyte_yet_another_namespace_with_a_very_long_name", + "table": "simple_b94_stream_name" + }, + "yet_another_namespace_with_a_very_long_name.simple_b94_stream_name.simple_b94_stream_name": { + "file": "yet_another_namespace__e_b94_stream_name_5d1", + "schema": "yet_another_namespace_with_a_very_long_name", + "table": "simple_b94_stream_name" + } +} diff --git a/airbyte-integrations/bases/base-normalization/unit_tests/test_destination_name_transformer.py b/airbyte-integrations/bases/base-normalization/unit_tests/test_destination_name_transformer.py index bcb750df766b..952a1243c65d 100644 --- a/airbyte-integrations/bases/base-normalization/unit_tests/test_destination_name_transformer.py +++ b/airbyte-integrations/bases/base-normalization/unit_tests/test_destination_name_transformer.py @@ -38,6 +38,7 @@ def before_tests(request): ("Hello World", "MySQL", True), ("Hello World", "MSSQL", True), ("Hello World", "TiDB", True), + ("Hello World", "DuckDB", True), # Reserved Word for BigQuery and MySQL only ("Groups", "Postgres", False), ("Groups", "BigQuery", True), @@ -46,6 +47,7 @@ def before_tests(request): ("Groups", "MySQL", True), ("Groups", "MSSQL", False), ("Groups", "TiDB", True), + ("Groups", "DuckDB", True), # Doesnt start with alpha or underscore ("100x200", "Postgres", True), ("100x200", "BigQuery", False), @@ -54,6 +56,7 @@ def before_tests(request): ("100x200", "MySQL", True), ("100x200", "MSSQL", True), ("100x200", "TiDB", True), + ("100x200", "DuckDB", True), # Contains non alpha numeric ("post.wall", "Postgres", True), ("post.wall", "BigQuery", False), @@ -62,6 +65,7 @@ def before_tests(request): ("post.wall", "MySQL", True), ("post.wall", "MSSQL", True), ("post.wall", "TiDB", True), + ("post.wall", "DuckDB", True), ], ) def test_needs_quote(input_str: str, destination_type: str, expected: bool): @@ -113,6 +117,7 @@ def test_transform_standard_naming(input_str: str, expected: str): ("Identifier Name", "MySQL", "{{ adapter.quote('Identifier Name') }}", "adapter.quote('Identifier Name')"), ("Identifier Name", "MSSQL", "{{ adapter.quote('Identifier Name') }}", "adapter.quote('Identifier Name')"), ("Identifier Name", "TiDB", "{{ adapter.quote('Identifier Name') }}", "adapter.quote('Identifier Name')"), + ("Identifier Name", "DuckDB", "{{ adapter.quote('Identifier Name') }}", "adapter.quote('Identifier Name')"), # Reserved Word for BigQuery and MySQL only ("Groups", "Postgres", "groups", "'groups'"), ("Groups", "BigQuery", "{{ adapter.quote('Groups') }}", "adapter.quote('Groups')"), @@ -121,6 +126,7 @@ def test_transform_standard_naming(input_str: str, expected: str): ("Groups", "MySQL", "{{ adapter.quote('Groups') }}", "adapter.quote('Groups')"), ("Groups", "MSSQL", "groups", "'groups'"), ("Groups", "TiDB", "{{ adapter.quote('Groups') }}", "adapter.quote('Groups')"), + ("Groups", "DuckDB", "{{ adapter.quote('Groups') }}", "adapter.quote('Groups')"), ], ) def test_normalize_column_name(input_str: str, destination_type: str, expected: str, expected_in_jinja: str): @@ -171,6 +177,7 @@ def test_truncate_identifier(input_str: str, expected: str): ("Identifier Name5", "MySQL", "identifier_name5", "{{ adapter.quote('Identifier Name5') }}"), ("Identifier Name6", "MSSQL", "identifier_name6", "{{ adapter.quote('Identifier Name6') }}"), ("Identifier Name7", "TiDB", "identifier_name7", "{{ adapter.quote('Identifier Name7') }}"), + ("Identifier Name8", "DuckDB", "identifier_name8", "{{ adapter.quote('Identifier Name8') }}"), # Unicode ("a-Unicode_name_文1", "Postgres", "a_unicode_name__1", "{{ adapter.quote('a-Unicode_name_文1') }}"), ("a-Unicode_name_文2", "BigQuery", "a_Unicode_name__2", "a_Unicode_name__2"), @@ -179,6 +186,7 @@ def test_truncate_identifier(input_str: str, expected: str): ("a-Unicode_name_文5", "MySQL", "a_unicode_name__5", "{{ adapter.quote('a-Unicode_name_文5') }}"), ("a-Unicode_name_文6", "MSSQL", "a_unicode_name__6", "{{ adapter.quote('a-Unicode_name_文6') }}"), ("a-Unicode_name_文7", "TiDB", "a_unicode_name__7", "{{ adapter.quote('a-Unicode_name_文7') }}"), + ("a-Unicode_name_文8", "DuckDB", "a_unicode_name__8", "{{ adapter.quote('a-Unicode_name_文8') }}"), # Doesnt start with alpha or underscore ("100x2001", "Postgres", "100x2001", "{{ adapter.quote('100x2001') }}"), ("100x2002", "BigQuery", "100x2002", "_100x2002"), @@ -188,6 +196,7 @@ def test_truncate_identifier(input_str: str, expected: str): ("100x2005", "MySQL", "100x2005", "{{ adapter.quote('100x2005') }}"), ("100x2006", "MSSQL", "_100x2006", "{{ adapter.quote('100x2006') }}"), ("100x2007", "TiDB", "100x2007", "{{ adapter.quote('100x2007') }}"), + ("100x2008", "DuckDB", "100x2008", "{{ adapter.quote('100x2008') }}"), # Reserved Keywords in BQ and MySQL ("Groups", "Postgres", "groups", "groups"), ("Groups", "BigQuery", "Groups", "{{ adapter.quote('Groups') }}"), @@ -196,6 +205,7 @@ def test_truncate_identifier(input_str: str, expected: str): ("Groups", "MySQL", "Groups", "{{ adapter.quote('Groups') }}"), ("Groups", "MSSQL", "groups", "groups"), ("Groups", "TiDB", "Groups", "{{ adapter.quote('Groups') }}"), + ("Groups", "DuckDB", "Groups", "{{ adapter.quote('Groups') }}"), # Reserved Keywords ("DisTincT", "Postgres", "DisTincT", "{{ adapter.quote('DisTincT') }}"), ("DisTincT", "BigQuery", "DisTincT", "{{ adapter.quote('DisTincT') }}"), @@ -204,6 +214,7 @@ def test_truncate_identifier(input_str: str, expected: str): ("DisTincT", "MySQL", "DisTincT", "{{ adapter.quote('DisTincT') }}"), ("DisTincT", "MSSQL", "DisTincT", "{{ adapter.quote('DisTincT') }}"), ("DisTincT", "TiDB", "DisTincT", "{{ adapter.quote('DisTincT') }}"), + ("DisTincT", "DuckDB", "DisTincT", "{{ adapter.quote('DisTincT') }}"), # Quoted identifiers ("'QuoTed1 IdenTifiER'", "Postgres", "_quoted1_identifier_", "{{ adapter.quote('\\'QuoTed1 IdenTifiER\\'') }}"), ("'QuoTed2 IdenTifiER'", "BigQuery", "_QuoTed2_IdenTifiER_", "_QuoTed2_IdenTifiER_"), @@ -212,6 +223,7 @@ def test_truncate_identifier(input_str: str, expected: str): ("'QuoTed5 IdenTifiER'", "MySQL", "_quoted5_identifier_", "{{ adapter.quote('\\'QuoTed5 IdenTifiER\\'') }}"), ("'QuoTed6 IdenTifiER'", "MSSQL", "_quoted6_identifier_", "{{ adapter.quote('\\'QuoTed6 IdenTifiER\\'') }}"), ("'QuoTed7 IdenTifiER'", "TiDB", "_quoted7_identifier_", "{{ adapter.quote('\\'QuoTed7 IdenTifiER\\'') }}"), + ("'QuoTed8 IdenTifiER'", "DuckDB", "_quoted8_identifier_", "{{ adapter.quote('\\'QuoTed8 IdenTifiER\\'') }}"), # Double Quoted identifiers ('"QuoTed7 IdenTifiER"', "Postgres", "_quoted7_identifier_", '{{ adapter.quote(\'""QuoTed7 IdenTifiER""\') }}'), ('"QuoTed8 IdenTifiER"', "BigQuery", "_QuoTed8_IdenTifiER_", "_QuoTed8_IdenTifiER_"), @@ -220,6 +232,7 @@ def test_truncate_identifier(input_str: str, expected: str): ('"QuoTed11 IdenTifiER"', "MySQL", "_quoted11_identifier_", "{{ adapter.quote('\"QuoTed11 IdenTifiER\"') }}"), ('"QuoTed12 IdenTifiER"', "MSSQL", "_quoted12_identifier_", '{{ adapter.quote(\'""QuoTed12 IdenTifiER""\') }}'), ('"QuoTed13 IdenTifiER"', "TiDB", "_quoted13_identifier_", "{{ adapter.quote('\"QuoTed13 IdenTifiER\"') }}"), + ('"QuoTed14 IdenTifiER"', "DuckDB", "_quoted14_identifier_", "{{ adapter.quote('\"QuoTed14 IdenTifiER\"') }}"), # Back Quoted identifiers ("`QuoTed13 IdenTifiER`", "Postgres", "_quoted13_identifier_", "{{ adapter.quote('`QuoTed13 IdenTifiER`') }}"), ("`QuoTed14 IdenTifiER`", "BigQuery", "_QuoTed14_IdenTifiER_", "_QuoTed14_IdenTifiER_"), @@ -228,6 +241,7 @@ def test_truncate_identifier(input_str: str, expected: str): ("`QuoTed17 IdenTifiER`", "MySQL", "_quoted17_identifier_", "{{ adapter.quote('_QuoTed17 IdenTifiER_') }}"), ("`QuoTed18 IdenTifiER`", "MSSQL", "_quoted18_identifier_", "{{ adapter.quote('`QuoTed18 IdenTifiER`') }}"), ("`QuoTed17 IdenTifiER`", "TiDB", "_quoted17_identifier_", "{{ adapter.quote('_QuoTed17 IdenTifiER_') }}"), + ("`QuoTed19 IdenTifiER`", "DuckDB", "_quoted19_identifier_", "{{ adapter.quote('_QuoTed19 IdenTifiER_') }}"), ], ) def test_normalize_name(input_str: str, destination_type: str, expected: str, expected_column: str): diff --git a/airbyte-integrations/bases/base-normalization/unit_tests/test_transform_config.py b/airbyte-integrations/bases/base-normalization/unit_tests/test_transform_config.py index 2adbb2f441cf..ed9f685d305a 100644 --- a/airbyte-integrations/bases/base-normalization/unit_tests/test_transform_config.py +++ b/airbyte-integrations/bases/base-normalization/unit_tests/test_transform_config.py @@ -11,7 +11,7 @@ import pytest from normalization.destination_type import DestinationType -from normalization.transform_catalog.transform import extract_schema +from normalization.transform_catalog.transform import extract_path, extract_schema from normalization.transform_config.transform import TransformConfig @@ -505,6 +505,39 @@ def test_transform_tidb(self): assert expected == actual assert extract_schema(actual) == "ti_db" + def test_transform_duckdb_schema(self): + input = { + "type": "duckdb", + "destination_path": "/local/testing.duckdb", + "schema": "quackqauck", + } + + actual = TransformConfig().transform_duckdb(input) + expected = { + "type": "duckdb", + "path": "/local/testing.duckdb", + "schema": "quackqauck", + } + + assert expected == actual + assert extract_path(actual) == "/local/testing.duckdb" + + def test_transform_duckdb_no_schema(self): + input = { + "type": "duckdb", + "destination_path": "/local/testing.duckdb", + } + + actual = TransformConfig().transform_duckdb(input) + expected = { + "type": "duckdb", + "path": "/local/testing.duckdb", + "schema": "main", + } + + assert expected == actual + assert extract_path(actual) == "/local/testing.duckdb" + def get_base_config(self): return { "config": { diff --git a/airbyte-integrations/builds.md b/airbyte-integrations/builds.md index 5ca71363817a..25154fc33780 100644 --- a/airbyte-integrations/builds.md +++ b/airbyte-integrations/builds.md @@ -34,6 +34,7 @@ | Dixa | [![source-dixa](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fsource-dixa%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/connectors/source-dixa) | | Dockerhub | [![source-dockerhub](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fsource-dockerhub%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/connectors/source-dockerhub) | | Drift | [![source-drift](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fsource-drift%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/connectors/source-drift) | +| DuckDB | [![source-duckdb](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fsource-duckdb%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/connectors/source-duckdb) | | End-to-End Testing | [![source-e2e-test](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fsource-e2e-test%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/connectors/source-e2e-test) | | Exchange Rates API | [![source-exchange-rates](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fsource-exchange-rates%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/connectors/source-exchange-rates) | | Facebook Marketing | [![source-facebook-marketing](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fsource-facebook-marketing%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/connectors/source-facebook-marketing) | diff --git a/airbyte-integrations/connectors/destination-duckdb/.dockerignore b/airbyte-integrations/connectors/destination-duckdb/.dockerignore new file mode 100644 index 000000000000..07bca5ba6cb9 --- /dev/null +++ b/airbyte-integrations/connectors/destination-duckdb/.dockerignore @@ -0,0 +1,5 @@ +* +!Dockerfile +!main.py +!destination_duckdb +!setup.py diff --git a/airbyte-integrations/connectors/destination-duckdb/Dockerfile b/airbyte-integrations/connectors/destination-duckdb/Dockerfile new file mode 100644 index 000000000000..159e8de1de35 --- /dev/null +++ b/airbyte-integrations/connectors/destination-duckdb/Dockerfile @@ -0,0 +1,42 @@ +FROM python:3.9.11 as base +# FROM python:3.9.11-alpine3.15 as base +# switched from alpine as there were tons of errors (in case you want to switch back to alpine) +# - https://stackoverflow.com/a/57485724/5246670 +# - numpy error: https://stackoverflow.com/a/22411624/5246670 +# - libstdc++ https://github.com/amancevice/docker-pandas/issues/12#issuecomment-717215043 +# - musl-dev linux-headers g++ because of: https://stackoverflow.com/a/40407099/5246670 + +# build and load all requirements +FROM base as builder +WORKDIR /airbyte/integration_code + +# upgrade pip to the latest version +RUN apt-get update && apt-get -y upgrade \ + && pip install --upgrade pip + +COPY setup.py ./ +# install necessary packages to a temporary folder +RUN pip install --prefix=/install . +# build a clean environment +FROM base +# RUN conda install -c conda-forge python-duckdb +WORKDIR /airbyte/integration_code + +# copy all loaded and built libraries to a pure basic image +COPY --from=builder /install /usr/local +# add default timezone settings +COPY --from=builder /usr/share/zoneinfo/Etc/UTC /etc/localtime +RUN echo "Etc/UTC" > /etc/timezone + +#adding duckdb manually (outside of setup.py - lots of errors) +RUN pip install duckdb + +# copy payload code only +COPY main.py ./ +COPY destination_duckdb ./destination_duckdb + +ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" +ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] + +LABEL io.airbyte.version=0.1.0 +LABEL io.airbyte.name=airbyte/destination-duckdb diff --git a/airbyte-integrations/connectors/destination-duckdb/README.md b/airbyte-integrations/connectors/destination-duckdb/README.md new file mode 100644 index 000000000000..71d69e52e39b --- /dev/null +++ b/airbyte-integrations/connectors/destination-duckdb/README.md @@ -0,0 +1,133 @@ +# Duckdb Destination + +This is the repository for the Duckdb destination connector, written in Python. +For information about how to use this connector within Airbyte, see [the documentation](https://docs.airbyte.io/integrations/destinations/duckdb). + +## Local development + +### Prerequisites +**To iterate on this connector, make sure to complete this prerequisites section.** + +#### Minimum Python version required `= 3.7.0` + +#### Build & Activate Virtual Environment and install dependencies +From this connector directory, create a virtual environment: +``` +python -m venv .venv +``` + +This will generate a virtualenv for this module in `.venv/`. Make sure this venv is active in your +development environment of choice. To activate it from the terminal, run: +``` +source .venv/bin/activate +python -m pip install --upgrade pip +pip install -r requirements.txt +``` +If you are in an IDE, follow your IDE's instructions to activate the virtualenv. + +Note that while we are installing dependencies from `requirements.txt`, you should only edit `setup.py` for your dependencies. `requirements.txt` is +used for editable installs (`pip install -e`) to pull in Python dependencies from the monorepo and will call `setup.py`. +If this is mumbo jumbo to you, don't worry about it, just put your deps in `setup.py` but install using `pip install -r requirements.txt` and everything +should work as you expect. + +#### Building via Gradle +From the Airbyte repository root, run: +``` +./gradlew :airbyte-integrations:connectors:destination-duckdb:build +``` + +#### Create credentials +**If you are a community contributor**, follow the instructions in the [documentation](https://docs.airbyte.io/integrations/destinations/duckdb) +to generate the necessary credentials. Then create a file `secrets/config.json` conforming to the `destination_duckdb/spec.json` file. +Note that the `secrets` directory is gitignored by default, so there is no danger of accidentally checking in sensitive information. +See `integration_tests/sample_config.json` for a sample config file. + +**If you are an Airbyte core member**, copy the credentials in Lastpass under the secret name `destination duckdb test creds` +and place them into `secrets/config.json`. + +### Locally running the connector +``` +python main.py spec +python main.py check --config integration_tests/config.json +python main.py discover --config integration_tests/config.json +cat integration_tests/messages.jsonl| python main.py write --config integration_tests/config.json --catalog integration_tests/configured_catalog.json +``` + + +### Locally running the connector docker image + +#### Build +First, make sure you build the latest Docker image: +``` +docker build . -t airbyte/destination-duckdb:dev +``` + +You can also build the connector image via Gradle: +``` +./gradlew :airbyte-integrations:connectors:destination-duckdb:airbyteDocker +``` +When building via Gradle, the docker image name and tag, respectively, are the values of the `io.airbyte.name` and `io.airbyte.version` `LABEL`s in +the Dockerfile. + +#### Run +Then run any of the connector commands as follows: +``` +docker run --rm airbyte/destination-duckdb:dev spec +docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-duckdb:dev check --config /secrets/config.json +# messages.jsonl is a file containing line-separated JSON representing AirbyteMessages +cat integration_tests/messages.jsonl | docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/destination-duckdb:dev write --config /secrets/config.json --catalog /integration_tests/configured_catalog.json +``` + +## Testing + Make sure to familiarize yourself with [pytest test discovery](https://docs.pytest.org/en/latest/goodpractices.html#test-discovery) to know how your test files and methods should be named. +First install test dependencies into your virtual environment: +``` +pip install .[tests] +``` +### Unit Tests +To run unit tests locally, from the connector directory run: +``` +python -m pytest unit_tests +``` + +### Integration Tests +There are two types of integration tests: Acceptance Tests (Airbyte's test suite for all destination connectors) and custom integration tests (which are specific to this connector). +#### Custom Integration tests +Place custom tests inside `integration_tests/` folder, then, from the connector root, run +``` +python -m pytest integration_tests +``` +#### Acceptance Tests +Coming soon: + +### Using gradle to run tests +All commands should be run from airbyte project root. +To run unit tests: +``` +./gradlew :airbyte-integrations:connectors:destination-duckdb:unitTest +``` +To run acceptance and custom integration tests: +``` +./gradlew :airbyte-integrations:connectors:destination-duckdb:integrationTest +``` + +To run normalization image: +``` +./gradlew :airbyte-integrations:bases:base-normalization:airbyteDockerDuckDb +docker tag airbyte/normalization-duckdb:dev airbyte/normalization-duckdb:0.2.22 +``` + + +## Dependency Management +All of your dependencies should go in `setup.py`, NOT `requirements.txt`. The requirements file is only used to connect internal Airbyte dependencies in the monorepo for local development. +We split dependencies between two groups, dependencies that are: +* required for your connector to work need to go to `MAIN_REQUIREMENTS` list. +* required for the testing need to go to `TEST_REQUIREMENTS` list + +### Publishing a new version of the connector +You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what? +1. Make sure your changes are passing unit and integration tests. +1. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use [SemVer](https://semver.org/)). +1. Create a Pull Request. +1. Pat yourself on the back for being an awesome contributor. +1. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master. diff --git a/airbyte-integrations/connectors/destination-duckdb/build.gradle b/airbyte-integrations/connectors/destination-duckdb/build.gradle new file mode 100644 index 000000000000..fef6aa0dbd77 --- /dev/null +++ b/airbyte-integrations/connectors/destination-duckdb/build.gradle @@ -0,0 +1,8 @@ +plugins { + id 'airbyte-python' + id 'airbyte-docker' +} + +airbytePython { + moduleDirectory 'destination_duckdb' +} diff --git a/airbyte-integrations/connectors/destination-duckdb/destination_duckdb/__init__.py b/airbyte-integrations/connectors/destination-duckdb/destination_duckdb/__init__.py new file mode 100644 index 000000000000..002b3bced46e --- /dev/null +++ b/airbyte-integrations/connectors/destination-duckdb/destination_duckdb/__init__.py @@ -0,0 +1,8 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +from .destination import DestinationDuckdb + +__all__ = ["DestinationDuckdb"] diff --git a/airbyte-integrations/connectors/destination-duckdb/destination_duckdb/destination.py b/airbyte-integrations/connectors/destination-duckdb/destination_duckdb/destination.py new file mode 100644 index 000000000000..598dc70d80f8 --- /dev/null +++ b/airbyte-integrations/connectors/destination-duckdb/destination_duckdb/destination.py @@ -0,0 +1,164 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +import datetime +import json +import os +import uuid +from collections import defaultdict +from logging import getLogger +from typing import Any, Iterable, Mapping + +import duckdb +from airbyte_cdk import AirbyteLogger +from airbyte_cdk.destinations import Destination +from airbyte_cdk.models import AirbyteConnectionStatus, AirbyteMessage, ConfiguredAirbyteCatalog, DestinationSyncMode, Status, Type + +logger = getLogger("airbyte") + + +class DestinationDuckdb(Destination): + @staticmethod + def _get_destination_path(destination_path: str) -> str: + """ + Get a normalized version of the destination path. + Automatically append /local/ to the start of the path + """ + if not destination_path.startswith("/local"): + destination_path = os.path.join("/local", destination_path) + + destination_path = os.path.normpath(destination_path) + if not destination_path.startswith("/local"): + raise ValueError( + f"destination_path={destination_path} is not a valid path." "A valid path shall start with /local or no / prefix" + ) + + return destination_path + + def write( + self, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, input_messages: Iterable[AirbyteMessage] + ) -> Iterable[AirbyteMessage]: + + """ + Reads the input stream of messages, config, and catalog to write data to the destination. + + This method returns an iterable (typically a generator of AirbyteMessages via yield) containing state messages received + in the input message stream. Outputting a state message means that every AirbyteRecordMessage which came before it has been + successfully persisted to the destination. This is used to ensure fault tolerance in the case that a sync fails before fully completing, + then the source is given the last state message output from this method as the starting point of the next sync. + + :param config: dict of JSON configuration matching the configuration declared in spec.json + :param input_messages: The stream of input messages received from the source + :param configured_catalog: The Configured Catalog describing the schema of the data being received and how it should be persisted in the destination + :return: Iterable of AirbyteStateMessages wrapped in AirbyteMessage structs + """ + streams = {s.stream.name for s in configured_catalog.streams} + logger.info(f"Starting write to DuckDB with {len(streams)} streams") + + path = config.get("destination_path") + path = self._get_destination_path(path) + # check if file exists + + logger.info(f"Opening DuckDB file at {path}") + con = duckdb.connect(database=path, read_only=False) + + # create the tables if needed + # con.execute("BEGIN TRANSACTION") + for configured_stream in configured_catalog.streams: + + name = configured_stream.stream.name + table_name = f"_airbyte_raw_{name}" + if configured_stream.destination_sync_mode == DestinationSyncMode.overwrite: + # delete the tables + logger.info(f"Dropping tables for overwrite: {table_name}") + query = """ + DROP TABLE IF EXISTS {} + """.format( + table_name + ) + con.execute(query) + # create the table if needed + query = f""" + CREATE TABLE IF NOT EXISTS {table_name} ( + _airbyte_ab_id TEXT PRIMARY KEY, + _airbyte_emitted_at JSON, + _airbyte_data JSON + ) + """ + + con.execute(query) + + buffer = defaultdict(list) + + for message in input_messages: + + if message.type == Type.STATE: + # flush the buffer + for stream_name in buffer.keys(): + + logger.info(f"flushing buffer for state: {message}") + query = """ + INSERT INTO {table_name} + VALUES (?,?,?) + """.format( + table_name=f"_airbyte_raw_{stream_name}" + ) + logger.info(f"query: {query}") + + con.executemany(query, buffer[stream_name]) + + con.commit() + buffer = defaultdict(list) + + yield message + elif message.type == Type.RECORD: + data = message.record.data + stream = message.record.stream + if stream not in streams: + logger.debug(f"Stream {stream} was not present in configured streams, skipping") + continue + + # add to buffer + buffer[stream].append((str(uuid.uuid4()), datetime.datetime.now().isoformat(), json.dumps(data))) + else: + logger.info(f"Message type {message.type} not supported, skipping") + + # flush any remaining messages + for stream_name in buffer.keys(): + + query = """ + INSERT INTO {table_name} + VALUES (?,?,?) + """.format( + table_name=f"_airbyte_raw_{stream_name}" + ) + + con.executemany(query, buffer[stream_name]) + con.commit() + + def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: + """ + Tests if the input configuration can be used to successfully connect to the destination with the needed permissions + e.g: if a provided API token or password can be used to connect and write to the destination. + + :param logger: Logging object to display debug/info/error to the logs + (logs will not be accessible via airbyte UI if they are not passed to this logger) + :param config: Json object containing the configuration of this destination, content of this json is as specified in + the properties of the spec.json file + + :return: AirbyteConnectionStatus indicating a Success or Failure + """ + try: + # parse the destination path + param_path = config.get("destination_path") + path = self._get_destination_path(param_path) + + os.makedirs(os.path.dirname(path), exist_ok=True) + con = duckdb.connect(database=path, read_only=False) + con.execute("SELECT 1;") + + return AirbyteConnectionStatus(status=Status.SUCCEEDED) + + except Exception as e: + return AirbyteConnectionStatus(status=Status.FAILED, message=f"An exception occurred: {repr(e)}") diff --git a/airbyte-integrations/connectors/destination-duckdb/destination_duckdb/spec.json b/airbyte-integrations/connectors/destination-duckdb/destination_duckdb/spec.json new file mode 100644 index 000000000000..9686cea6d8ad --- /dev/null +++ b/airbyte-integrations/connectors/destination-duckdb/destination_duckdb/spec.json @@ -0,0 +1,27 @@ +{ + "documentationUrl": "https://docs.airbyte.io/integrations/destinations/duckdb", + "supported_destination_sync_modes": ["overwrite", "append"], + "supportsIncremental": true, + "supportsDBT": true, + "supportsNormalization": true, + "connectionSpecification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Destination Duckdb", + "type": "object", + "required": ["destination_path"], + "additionalProperties": true, + "properties": { + "destination_path": { + "type": "string", + "description": "Path to the .duckdb file. The file will be placed inside that local mount. For more information check out our docs", + "example": "/local/destination.duckdb" + }, + "schema": { + "type": "string", + "description": "database schema, default for duckdb is main", + "example": "main" + } + + } + } +} diff --git a/airbyte-integrations/connectors/destination-duckdb/integration_tests/config.json b/airbyte-integrations/connectors/destination-duckdb/integration_tests/config.json new file mode 100644 index 000000000000..1e6f086a7be0 --- /dev/null +++ b/airbyte-integrations/connectors/destination-duckdb/integration_tests/config.json @@ -0,0 +1 @@ +{"destination_path": "/local/destination.duckdb"} diff --git a/airbyte-integrations/connectors/destination-duckdb/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/destination-duckdb/integration_tests/configured_catalog.json new file mode 100644 index 000000000000..182c49551da5 --- /dev/null +++ b/airbyte-integrations/connectors/destination-duckdb/integration_tests/configured_catalog.json @@ -0,0 +1,39 @@ +{ + "streams": [ + { + "stream": { + "name": "airbyte_acceptance_table", + "supported_sync_modes": ["full_refresh"], + "source_defined_cursor": false, + "json_schema": { + "type": "object", + "properties": { + "column1": { + "type": "string" + }, + "column2": { + "type": "number" + }, + "column3": { + "type": "string", + "format": "datetime", + "airbyte_type": "timestamp_without_timezone" + }, + "column4": { + "type": "number" + }, + "column5": { + "type": "array", + "items": { + "type": "integer" + } + } + } + } + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite" + } + ] +} + diff --git a/airbyte-integrations/connectors/destination-duckdb/integration_tests/integration_test.py b/airbyte-integrations/connectors/destination-duckdb/integration_tests/integration_test.py new file mode 100644 index 000000000000..4ce0155d2c89 --- /dev/null +++ b/airbyte-integrations/connectors/destination-duckdb/integration_tests/integration_test.py @@ -0,0 +1,151 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +import json +import os +import random +import string +import tempfile +from datetime import datetime +from typing import Any, Dict +from unittest.mock import MagicMock + +import duckdb +import pytest +from airbyte_cdk.models import ( + AirbyteMessage, + AirbyteRecordMessage, + AirbyteStateMessage, + AirbyteStream, + ConfiguredAirbyteCatalog, + ConfiguredAirbyteStream, + DestinationSyncMode, + Status, + SyncMode, + Type, +) +from destination_duckdb import DestinationDuckdb + + +@pytest.fixture(autouse=True) +def disable_destination_modification(monkeypatch, request): + if "disable_autouse" in request.keywords: + return + else: + monkeypatch.setattr(DestinationDuckdb, "_get_destination_path", lambda _, x: x) + + +@pytest.fixture(scope="module") +def local_file_config() -> Dict[str, str]: + # create a file "myfile" in "mydir" in temp directory + tmp_dir = tempfile.TemporaryDirectory() + test = os.path.join(str(tmp_dir), "test.duckdb") + + # f1.write_text("text to myfile") + yield {"destination_path": test} + + +@pytest.fixture(scope="module") +def test_table_name() -> str: + letters = string.ascii_lowercase + rand_string = "".join(random.choice(letters) for _ in range(10)) + return f"airbyte_integration_{rand_string}" + + +@pytest.fixture +def table_schema() -> str: + schema = {"type": "object", "properties": {"column1": {"type": ["null", "string"]}}} + return schema + + +@pytest.fixture +def configured_catalogue(test_table_name: str, table_schema: str) -> ConfiguredAirbyteCatalog: + append_stream = ConfiguredAirbyteStream( + stream=AirbyteStream( + name=test_table_name, json_schema=table_schema, supported_sync_modes=[SyncMode.full_refresh, SyncMode.incremental] + ), + sync_mode=SyncMode.incremental, + destination_sync_mode=DestinationSyncMode.append, + ) + return ConfiguredAirbyteCatalog(streams=[append_stream]) + + +@pytest.fixture +def invalid_config() -> Dict[str, str]: + return {"destination_path": "/destination.duckdb"} + + +@pytest.fixture +def airbyte_message1(test_table_name: str): + return AirbyteMessage( + type=Type.RECORD, + record=AirbyteRecordMessage( + stream=test_table_name, data={"key1": "value1", "key2": 3}, emitted_at=int(datetime.now().timestamp()) * 1000 + ), + ) + + +@pytest.fixture +def airbyte_message2(test_table_name: str): + return AirbyteMessage( + type=Type.RECORD, + record=AirbyteRecordMessage( + stream=test_table_name, data={"key1": "value2", "key2": 2}, emitted_at=int(datetime.now().timestamp()) * 1000 + ), + ) + + +@pytest.fixture +def airbyte_message3(): + return AirbyteMessage(type=Type.STATE, state=AirbyteStateMessage(data={"state": "1"})) + + +@pytest.mark.parametrize("config", ["invalid_config"]) +@pytest.mark.disable_autouse +def test_check_fails(config, request): + config = request.getfixturevalue(config) + destination = DestinationDuckdb() + status = destination.check(logger=MagicMock(), config=config) + assert status.status == Status.FAILED + + +@pytest.mark.parametrize("config", ["local_file_config"]) +def test_check_succeeds(config, request): + config = request.getfixturevalue(config) + destination = DestinationDuckdb() + status = destination.check(logger=MagicMock(), config=config) + assert status.status == Status.SUCCEEDED + + +def _state(data: Dict[str, Any]) -> AirbyteMessage: + return AirbyteMessage(type=Type.STATE, state=AirbyteStateMessage(data=data)) + + +@pytest.mark.parametrize("config", ["local_file_config"]) +def test_write( + config: Dict[str, str], + request, + configured_catalogue: ConfiguredAirbyteCatalog, + airbyte_message1: AirbyteMessage, + airbyte_message2: AirbyteMessage, + airbyte_message3: AirbyteMessage, + test_table_name: str, +): + config = request.getfixturevalue(config) + destination = DestinationDuckdb() + generator = destination.write(config, configured_catalogue, [airbyte_message1, airbyte_message2, airbyte_message3]) + + result = list(generator) + assert len(result) == 1 + + con = duckdb.connect(database=config.get("destination_path"), read_only=False) + with con: + cursor = con.execute( + f"SELECT _airbyte_ab_id, _airbyte_emitted_at, _airbyte_data FROM _airbyte_raw_{test_table_name} ORDER BY _airbyte_data" + ) + result = cursor.fetchall() + + assert len(result) == 2 + assert result[0][2] == json.dumps(airbyte_message1.record.data) + assert result[1][2] == json.dumps(airbyte_message2.record.data) diff --git a/airbyte-integrations/connectors/destination-duckdb/integration_tests/invalid_config.json b/airbyte-integrations/connectors/destination-duckdb/integration_tests/invalid_config.json new file mode 100644 index 000000000000..95e38c64cd40 --- /dev/null +++ b/airbyte-integrations/connectors/destination-duckdb/integration_tests/invalid_config.json @@ -0,0 +1,3 @@ +{ + "destination_path": "//something-not-valid" +} diff --git a/airbyte-integrations/connectors/destination-duckdb/integration_tests/messages.jsonl b/airbyte-integrations/connectors/destination-duckdb/integration_tests/messages.jsonl new file mode 100644 index 000000000000..09f528422e30 --- /dev/null +++ b/airbyte-integrations/connectors/destination-duckdb/integration_tests/messages.jsonl @@ -0,0 +1 @@ +{"type": "RECORD", "record": {"stream": "airbyte_acceptance_table", "emitted_at": 1664705198575, "data": { "column1": "test", "column2": 222, "column3": "2022-06-20T18:56:18", "column4": 33.33, "column5": [1,2,null]}}} diff --git a/airbyte-integrations/connectors/destination-duckdb/main.py b/airbyte-integrations/connectors/destination-duckdb/main.py new file mode 100644 index 000000000000..a812a22f9aa8 --- /dev/null +++ b/airbyte-integrations/connectors/destination-duckdb/main.py @@ -0,0 +1,11 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +import sys + +from destination_duckdb import DestinationDuckdb + +if __name__ == "__main__": + DestinationDuckdb().run(sys.argv[1:]) diff --git a/airbyte-integrations/connectors/destination-duckdb/requirements.txt b/airbyte-integrations/connectors/destination-duckdb/requirements.txt new file mode 100644 index 000000000000..d6e1198b1ab1 --- /dev/null +++ b/airbyte-integrations/connectors/destination-duckdb/requirements.txt @@ -0,0 +1 @@ +-e . diff --git a/airbyte-integrations/connectors/destination-duckdb/setup.py b/airbyte-integrations/connectors/destination-duckdb/setup.py new file mode 100644 index 000000000000..a0fd84f64845 --- /dev/null +++ b/airbyte-integrations/connectors/destination-duckdb/setup.py @@ -0,0 +1,23 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +from setuptools import find_packages, setup + +MAIN_REQUIREMENTS = ["airbyte-cdk", "duckdb"] # duckdb added manually to dockerfile due to lots of errors + +TEST_REQUIREMENTS = ["pytest~=6.1"] + +setup( + name="destination_duckdb", + description="Destination implementation for Duckdb.", + author="Simon Späti", + author_email="contact@airbyte.io", + packages=find_packages(), + install_requires=MAIN_REQUIREMENTS, + package_data={"": ["*.json"]}, + extras_require={ + "tests": TEST_REQUIREMENTS, + }, +) diff --git a/airbyte-integrations/connectors/destination-duckdb/unit_tests/unit_test.py b/airbyte-integrations/connectors/destination-duckdb/unit_tests/unit_test.py new file mode 100644 index 000000000000..67c17d620f2a --- /dev/null +++ b/airbyte-integrations/connectors/destination-duckdb/unit_tests/unit_test.py @@ -0,0 +1,15 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +import pytest +from destination_duckdb import DestinationDuckdb + + +def test_read_invalid_path(): + + invalid_input = "/test.duckdb" + with pytest.raises(ValueError): + _ = DestinationDuckdb._get_destination_path(invalid_input) + + assert True diff --git a/build.gradle b/build.gradle index 8b11def5547a..079ae33f9a26 100644 --- a/build.gradle +++ b/build.gradle @@ -120,6 +120,7 @@ def createSpotlessTarget = { pattern -> 'dbt-project-template-clickhouse', 'dbt-project-template-snowflake', 'dbt-project-template-tidb', + 'dbt-project-template-duckdb', 'dbt_test_config', 'normalization_test_output', 'tools', diff --git a/docs/integrations/destinations/duckdb.md b/docs/integrations/destinations/duckdb.md new file mode 100644 index 000000000000..e6262f5b84b0 --- /dev/null +++ b/docs/integrations/destinations/duckdb.md @@ -0,0 +1,84 @@ + +# DuckDB + +:::danger + +This destination is meant to be used on a local workstation and won't work on Kubernetes + +::: + +## Overview + +[DuckDB](https://duckdb.org/) is an in-process SQL OLAP database management system and this destination is meant to use locally if you have multiple smaller sources such as GitHub repos, some social media and local CSVs or files you want to run analytics workloads on. + +This destination writes data to a file on the _local_ filesystem on the host running Airbyte. By default, data is written to `/tmp/airbyte_local`. To change this location, modify the `LOCAL_ROOT` environment variable for Airbyte. + +### Sync Overview + +#### Output schema + +If you set [Normalization](https://docs.airbyte.com/understanding-airbyte/basic-normalization/), source data will be normalized to a tabular form. Let's say you have a source such as GitHub with nested JSONs; the Normalization ensures you end up with tables and columns. Suppose you have a many-to-many relationship between the users and commits. Normalization will create separate tables for it. The end state is the [third normal form](https://en.wikipedia.org/wiki/Third_normal_form) (3NF). + +If turn off the Normalization, each stream will be output into its own table `_airbyte_raw_{stream_name}`. Each table will contain 3 columns: + +* `_airbyte_ab_id`: a uuid assigned by Airbyte to each event that is processed. +* `_airbyte_emitted_at`: a timestamp representing when the event was pulled from the data source. +* `_airbyte_data`: a json blob representing with the event data. + +#### Features + +| Feature | Supported | | +| :--- | :--- | :--- | +| Full Refresh Sync | Yes | | +| Incremental - Append Sync | Yes | | +| Incremental - Deduped History | No | | +| Namespaces | No | | + +#### Performance consideration + +This integration will be constrained by the speed at which your filesystem accepts writes. + +## Getting Started + +The `destination_path` will always start with `/local` whether it is specified by the user or not. Any directory nesting within local will be mapped onto the local mount. + +By default, the `LOCAL_ROOT` env variable in the `.env` file is set `/tmp/airbyte_local`. + +The local mount is mounted by Docker onto `LOCAL_ROOT`. This means the `/local` is substituted by `/tmp/airbyte_local` by default. + +:::caution + +Please make sure that Docker Desktop has access to `/tmp` (and `/private` on a MacOS, as /tmp has a symlink that points to /private. It will not work otherwise). You allow it with "File sharing" in `Settings -> Resources -> File sharing -> add the one or two above folder` and hit the "Apply & restart" button. + +::: + + +### Example: + +* If `destination_path` is set to `/local/destination.duckdb` +* the local mount is using the `/tmp/airbyte_local` default +* then all data will be written to `/tmp/airbyte_local/destination.duckdb`. + +## Access Replicated Data Files + +If your Airbyte instance is running on the same computer that you are navigating with, you can open your browser and enter [file:///tmp/airbyte\_local](file:///tmp/airbyte_local) to look at the replicated data locally. If the first approach fails or if your Airbyte instance is running on a remote server, follow the following steps to access the replicated files: + +1. Access the scheduler container using `docker exec -it airbyte-server bash` +2. Navigate to the default local mount using `cd /tmp/airbyte_local` +3. Navigate to the replicated file directory you specified when you created the destination, using `cd /{destination_path}` +4. Execute `duckdb {filename}` to access the data in a particular database file. + +You can also copy the output file to your host machine, the following command will copy the file to the current working directory you are using: + +```text +docker cp airbyte-server:/tmp/airbyte_local/{destination_path} . +``` + +Note: If you are running Airbyte on Windows with Docker backed by WSL2, you have to use similar step as above or refer to this [link](../../operator-guides/locating-files-local-destination.md) for an alternative approach. + +## Changelog + +| Version | Date | Pull Request | Subject | +| :--- | :--- | :--- | :--- | +| 0.1.0 | 2022-10-14 | [17494](https://github.com/airbytehq/airbyte/pull/17494) | New DuckDB destination | + diff --git a/settings.gradle b/settings.gradle index de388a86fbd0..e24aa6c7fd92 100644 --- a/settings.gradle +++ b/settings.gradle @@ -147,6 +147,7 @@ if (!System.getenv().containsKey("SUB_BUILD") || System.getenv().get("SUB_BUILD" include ':airbyte-integrations:connectors:destination-mssql' include ':airbyte-integrations:connectors:destination-clickhouse' include ':airbyte-integrations:connectors:destination-tidb' + include ':airbyte-integrations:connectors:destination-duckdb' //Needed by destination-bigquery include ':airbyte-integrations:connectors:destination-s3'