Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Refactor Normalization docker images and upgrade to use dbt 0.21.0 #6959

Merged
merged 39 commits into from
Oct 14, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
9709f14
Split normalization docker images for some connectors with specifics …
ChristopheDuong Oct 8, 2021
bd90eee
Clean up schemas
ChristopheDuong Oct 11, 2021
6b7ff77
Use global version for normalization
ChristopheDuong Oct 11, 2021
3aa02dc
Trying to solve oracle tests
ChristopheDuong Oct 11, 2021
fb7c12f
Use EnvConfigs in main()
ChristopheDuong Oct 11, 2021
16addd9
test_ephemeral should use get_test_targets
ChristopheDuong Oct 11, 2021
f07bb4b
Postgres always in test_ephemeral targets
ChristopheDuong Oct 11, 2021
f5bcc85
format code
ChristopheDuong Oct 11, 2021
c7d3d1a
Tweak gradle dependencies
ChristopheDuong Oct 11, 2021
8d4a9bc
Tweak settings.gradle
ChristopheDuong Oct 12, 2021
6dd5c48
Fix test oracle
ChristopheDuong Oct 12, 2021
e4090ea
tweak settings.gradle
ChristopheDuong Oct 12, 2021
b1b7ffe
Merge remote-tracking branch 'origin/master' into split-normalization
ChristopheDuong Oct 12, 2021
672ace9
format code
ChristopheDuong Oct 12, 2021
b2c7f70
Fix bigquery ephemeral test
ChristopheDuong Oct 12, 2021
f4f0cd2
Merge remote-tracking branch 'origin/master' into split-normalization
ChristopheDuong Oct 12, 2021
c6d3de3
Format code
ChristopheDuong Oct 12, 2021
f13a404
Tweak comments
ChristopheDuong Oct 12, 2021
be595cc
Fix tests
ChristopheDuong Oct 12, 2021
ae0a511
Fix integration tests
ChristopheDuong Oct 13, 2021
259afe9
Merge remote-tracking branch 'origin/master' into split-normalization
ChristopheDuong Oct 13, 2021
ccad8c3
tweak build
ChristopheDuong Oct 13, 2021
e912947
Re-enable test_check_row_count
ChristopheDuong Oct 13, 2021
1ad9ca4
add missing folder
ChristopheDuong Oct 13, 2021
84084c8
rename test file
ChristopheDuong Oct 13, 2021
5e9ded1
Spotless settings
ChristopheDuong Oct 13, 2021
ec677cc
Fix snowflake uppercse test
ChristopheDuong Oct 13, 2021
96e1333
Fix oracle tests
ChristopheDuong Oct 13, 2021
05c12e1
Apply suggestions from code review
ChristopheDuong Oct 13, 2021
e028ed7
Merge remote-tracking branch 'origin/master' into split-normalization
ChristopheDuong Oct 14, 2021
c461a8f
Add env variables to test for using external db
ChristopheDuong Oct 14, 2021
e48f17a
Split integration tests between simple and nested
ChristopheDuong Oct 14, 2021
70afb15
Merge remote-tracking branch 'origin/master' into split-normalization
ChristopheDuong Oct 14, 2021
c281f1f
code format
ChristopheDuong Oct 14, 2021
9f0bcc8
Add column with quotes in simple streams
ChristopheDuong Oct 14, 2021
18fb503
format code
ChristopheDuong Oct 14, 2021
cb9889c
Fix tests
ChristopheDuong Oct 14, 2021
881cb68
Cleanup dir before running tests
ChristopheDuong Oct 14, 2021
2129ac8
Regenerate (#7003)
ChristopheDuong Oct 14, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions airbyte-integrations/bases/base-normalization/.dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,6 @@
!setup.py
!normalization
!dbt-project-template
!dbt-project-template-mssql
!dbt-project-template-mysql
!dbt-project-template-oracle
39 changes: 1 addition & 38 deletions airbyte-integrations/bases/base-normalization/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,36 +1,4 @@
FROM fishtownanalytics/dbt:0.19.0

USER root
WORKDIR /tmp
RUN apt-get update && apt-get install -y \
wget \
curl \
unzip \
libaio-dev \
libaio1 \
gnupg \
gnupg1 \
gnupg2

# Install MS SQL Server dependencies
RUN curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add -
RUN curl https://packages.microsoft.com/config/debian/10/prod.list > /etc/apt/sources.list.d/mssql-release.list
RUN apt-get update && ACCEPT_EULA=Y apt-get install -y \
libgssapi-krb5-2 \
unixodbc-dev \
msodbcsql17 \
mssql-tools
ENV PATH=$PATH:/opt/mssql-tools/bin

# Install Oracle dependencies
RUN mkdir -p /opt/oracle
RUN wget https://download.oracle.com/otn_software/linux/instantclient/19600/instantclient-basic-linux.x64-19.6.0.0.0dbru.zip
RUN unzip instantclient-basic-linux.x64-19.6.0.0.0dbru.zip -d /opt/oracle
ENV ORACLE_HOME /opt/oracle/instantclient_19_6
ENV LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$ORACLE_HOME
ENV TNS_ADMIN /opt/oracle/instantclient_19_6/network/admin
RUN pip install cx_Oracle

FROM fishtownanalytics/dbt:0.21.0
COPY --from=airbyte/base-airbyte-protocol-python:0.1.1 /airbyte /airbyte

# Install SSH Tunneling dependencies
Expand All @@ -50,10 +18,6 @@ RUN pip install .

WORKDIR /airbyte/normalization_code
RUN pip install .
RUN pip install dbt-oracle==0.4.3
RUN pip install git+https://github.com/dbeatty10/dbt-mysql@96655ea9f7fca7be90c9112ce8ffbb5aac1d3716#egg=dbt-mysql
RUN pip install dbt-sqlserver==0.19.3


WORKDIR /airbyte/normalization_code/dbt-template/
# Download external dbt dependencies
Expand All @@ -63,5 +27,4 @@ WORKDIR /airbyte
ENV AIRBYTE_ENTRYPOINT "/airbyte/entrypoint.sh"
ENTRYPOINT ["/airbyte/entrypoint.sh"]

LABEL io.airbyte.version=0.1.52
LABEL io.airbyte.name=airbyte/normalization
9 changes: 9 additions & 0 deletions airbyte-integrations/bases/base-normalization/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,15 @@ or can also be invoked on github, thanks to the slash commands posted as comment

/test connector=bases/base-normalization

You can restrict the tests to a subset of destinations by specifying a comma separated list of destinations.
For example, let's say you are working on a change to normalization for Postgres, with Gradle:

NORMALIZATION_TEST_TARGET=postgres ./gradlew :airbyte-integrations:bases:base-normalization:integrationTest

or directly with pytest:

NORMALIZATION_TEST_TARGET=postgres pytest airbyte-integrations/bases/base-normalization/integration_tests

Note that these tests are connecting and processing data on top of real data warehouse destinations.
Therefore, valid credentials files are expected to be injected in the `secrets/` folder in order to run
(not included in git repository).
Expand Down
42 changes: 39 additions & 3 deletions airbyte-integrations/bases/base-normalization/build.gradle
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import java.nio.file.Paths

plugins {
id 'airbyte-docker'
id 'airbyte-python'
Expand Down Expand Up @@ -27,13 +29,13 @@ task checkSshScriptCopy(type: Task, dependsOn: copySshScript) {
}
}

test.dependsOn checkSshScriptCopy
assemble.dependsOn checkSshScriptCopy
airbyteDocker.dependsOn(checkSshScriptCopy)
test.dependsOn(checkSshScriptCopy)

installReqs.dependsOn(":airbyte-integrations:bases:airbyte-protocol:installReqs")
integrationTest.dependsOn(build)

task("customIntegrationTestPython", type: PythonTask, dependsOn: installTestReqs){
task("customIntegrationTestPython", type: PythonTask, dependsOn: installTestReqs) {
module = "pytest"
command = "-s integration_tests"

Expand All @@ -59,3 +61,37 @@ task('mypyCheck', type: PythonTask) {
dependsOn 'blackFormat'
}
check.dependsOn mypyCheck

static def getDockerfile(String customConnector) {
return "${customConnector}.Dockerfile"
}

static def getDockerImageName(String customConnector) {
return "airbyte/normalization-${customConnector}"
}

static def getImageNameWithTag(String customConnector) {
return "${getDockerImageName(customConnector)}:dev"
}


def buildAirbyteDocker(String customConnector) {
def baseCommand = ['docker', 'build', '.', '-f', getDockerfile(customConnector), '-t', getImageNameWithTag(customConnector)]
return {
commandLine baseCommand
}
}

task airbyteDockerMSSql(type: Exec, dependsOn: checkSshScriptCopy) {
configure buildAirbyteDocker('mssql')
}
task airbyteDockerMySql(type: Exec, dependsOn: checkSshScriptCopy) {
configure buildAirbyteDocker('mysql')
}
task airbyteDockerOracle(type: Exec, dependsOn: checkSshScriptCopy) {
configure buildAirbyteDocker('oracle')
}

airbyteDocker.dependsOn(airbyteDockerMSSql)
airbyteDocker.dependsOn(airbyteDockerMySql)
airbyteDocker.dependsOn(airbyteDockerOracle)
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# add dependencies. these will get pulled during the `dbt deps` process.

packages:
- git: "https://github.com/fishtown-analytics/dbt-utils.git"
revision: 0.6.4
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# add dependencies. these will get pulled during the `dbt deps` process.

packages:
- git: "https://github.com/fishtown-analytics/dbt-utils.git"
revision: 0.6.4
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# add dependencies. these will get pulled during the `dbt deps` process.

packages:
- git: "https://github.com/fishtown-analytics/dbt-utils.git"
revision: 0.6.4
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
#}

{% macro concat(fields) -%}
{{ adapter.dispatch('concat', packages = ['airbyte_utils', 'dbt_utils'])(fields) }}
{{ adapter.dispatch('concat')(fields) }}
{%- endmacro %}

{% macro postgres__concat(fields) %}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{#
Drop schema to clean up the destination database
#}
{% macro drop_schemas(schemas) %}
{% for schema in schemas %}
drop schema if exists {{ schema }} cascade;
{% endfor %}
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
{% macro mysql__except() %}
{% do exceptions.warn("MySQL does not support EXCEPT operator") %}
{% endmacro %}

{% macro oracle__except() %}
minus
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,60 @@
select count from final

{% endmacro %}

{% macro oracle__test_equality(model, compare_model, compare_columns=None) %}

{% set set_diff %}
count(*) + coalesce(abs(
sum(case when which_diff = 'a_minus_b' then 1 else 0 end) -
sum(case when which_diff = 'b_minus_a' then 1 else 0 end)
), 0)
{% endset %}

{#-- Needs to be set at parse time, before we return '' below --#}
{{ config(fail_calc = set_diff) }}

{#-- Prevent querying of db in parsing mode. This works because this macro does not create any new refs. #}
{%- if not execute -%}
{{ return('') }}
{% endif %}

-- setup
{%- do dbt_utils._is_relation(model, 'test_equality') -%}

{#-
If the compare_cols arg is provided, we can run this test without querying the
information schema — this allows the model to be an ephemeral model
-#}

{%- if not compare_columns -%}
{%- do dbt_utils._is_ephemeral(model, 'test_equality') -%}
{%- set compare_columns = adapter.get_columns_in_relation(model) | map(attribute='quoted') -%}
{%- endif -%}

{% set compare_cols_csv = compare_columns | join(', ') %}

with a as (
select * from {{ model }}
),
b as (
select * from {{ compare_model }}
),
a_minus_b as (
select {{compare_cols_csv}} from a
{{ dbt_utils.except() }}
select {{compare_cols_csv}} from b
),

b_minus_a as (
select {{compare_cols_csv}} from b
{{ dbt_utils.except() }}
select {{compare_cols_csv}} from a
),
unioned as (
select * from a_minus_b
union all
select * from b_minus_a
)
select * from unioned
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

packages:
- git: "https://github.com/fishtown-analytics/dbt-utils.git"
revision: 0.6.4
revision: 0.7.3
7 changes: 7 additions & 0 deletions airbyte-integrations/bases/base-normalization/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ function configuredbt() {
# No git repository provided, use the dbt-template folder (shipped inside normalization docker image)
# as the base folder for dbt workspace
cp -r /airbyte/normalization_code/dbt-template/* "${PROJECT_DIR}"
if [ "${INTEGRATION_TYPE}" == "MSSQL" ]; then
cp -r /airbyte/normalization_code/dbt-template-mssql/* "${PROJECT_DIR}"
elif [ "${INTEGRATION_TYPE}" == "MYSQL" ]; then
cp -r /airbyte/normalization_code/dbt-template-mysql/* "${PROJECT_DIR}"
elif [ "${INTEGRATION_TYPE}" == "ORACLE" ]; then
cp -r /airbyte/normalization_code/dbt-template-oracle/* "${PROJECT_DIR}"
fi
echo "Running: transform-config --config ${CONFIG_FILE} --integration-type ${INTEGRATION_TYPE} --out ${PROJECT_DIR}"
# Generate a profiles.yml file for the selected destination/integration type
transform-config --config "${CONFIG_FILE}" --integration-type "${INTEGRATION_TYPE}" --out "${PROJECT_DIR}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,36 @@
from normalization.destination_type import DestinationType
from normalization.transform_config.transform import TransformConfig

NORMALISATION_TEST_TARGET = "NORMALIZATION_TEST_TARGET"
ChristopheDuong marked this conversation as resolved.
Show resolved Hide resolved


class DbtIntegrationTest(object):
def __init__(self):
self.target_schema = "test_normalization"
self.container_prefix = f"test_normalization_db_{self.random_string(3)}"
self.db_names = ["postgres", "mysql", "mssql"]
self.db_names = []

@staticmethod
def generate_random_string(prefix: str) -> str:
return prefix + DbtIntegrationTest.random_string(5)

@staticmethod
def random_string(length: int) -> str:
return "".join(random.choice(string.ascii_lowercase) for i in range(length))

def setup_db(self):
self.setup_postgres_db()
self.setup_mysql_db()
self.setup_mssql_db()
def set_target_schema(self, target_schema: str):
self.target_schema = target_schema

def setup_db(self, destinations_to_test: List[str]):
if DestinationType.POSTGRES.value in destinations_to_test:
self.setup_postgres_db()
if DestinationType.MYSQL.value in destinations_to_test:
self.setup_mysql_db()
if DestinationType.MSSQL.value in destinations_to_test:
self.setup_mssql_db()

def setup_postgres_db(self):
self.db_names.append("postgres")
print("Starting localhost postgres container for tests")
port = self.find_free_port()
config = {
Expand Down Expand Up @@ -78,6 +91,7 @@ def setup_postgres_db(self):
fh.write(json.dumps(config))

def setup_mysql_db(self):
self.db_names.append("mysql")
print("Starting localhost mysql container for tests")
port = self.find_free_port()
config = {
Expand Down Expand Up @@ -115,6 +129,7 @@ def setup_mysql_db(self):
fh.write(json.dumps(config))

def setup_mssql_db(self):
self.db_names.append("mssql")
print("Starting localhost MS SQL Server container for tests")
port = self.find_free_port()
config = {
Expand Down Expand Up @@ -262,20 +277,32 @@ def writer():
process.wait()
return process.returncode == 0

def dbt_run(self, test_root_dir: str):
@staticmethod
def get_normalization_image(destination_type: DestinationType) -> str:
if DestinationType.MSSQL.value == destination_type.value:
return "airbyte/normalization-mssql:dev"
elif DestinationType.MYSQL.value == destination_type.value:
return "airbyte/normalization-mysql:dev"
elif DestinationType.ORACLE.value == destination_type.value:
return "airbyte/normalization-oracle:dev"
else:
return "airbyte/normalization:dev"

def dbt_run(self, destination_type: DestinationType, test_root_dir: str):
"""
Run the dbt CLI to perform transformations on the test raw data in the destination
"""
normalization_image: str = self.get_normalization_image(destination_type)
# Perform sanity check on dbt project settings
assert self.run_check_dbt_command("debug", test_root_dir)
assert self.run_check_dbt_command("deps", test_root_dir)
assert self.run_check_dbt_command(normalization_image, "debug", test_root_dir)
assert self.run_check_dbt_command(normalization_image, "deps", test_root_dir)
final_sql_files = os.path.join(test_root_dir, "final")
shutil.rmtree(final_sql_files, ignore_errors=True)
# Compile dbt models files into destination sql dialect, then run the transformation queries
assert self.run_check_dbt_command("run", test_root_dir)
assert self.run_check_dbt_command(normalization_image, "run", test_root_dir)

@staticmethod
def run_check_dbt_command(command: str, cwd: str) -> bool:
def run_check_dbt_command(normalization_image: str, command: str, cwd: str) -> bool:
"""
Run dbt subprocess while checking and counting for "ERROR", "FAIL" or "WARNING" printed in its outputs
"""
Expand All @@ -298,7 +325,7 @@ def run_check_dbt_command(command: str, cwd: str) -> bool:
"--entrypoint",
"/usr/local/bin/dbt",
"-i",
"airbyte/normalization:dev",
normalization_image,
command,
"--profiles-dir=/workspace",
"--project-dir=/workspace",
Expand All @@ -321,6 +348,7 @@ def run_check_dbt_command(command: str, cwd: str) -> bool:
"Nothing to do.", # When no schema/data tests are setup
"Configuration paths exist in your dbt_project.yml", # When no cte / view are generated
"Error loading config file: .dockercfg: $HOME is not defined", # ignore warning
"depends on a node named 'disabled_test' which was not found", # Tests throwing warning because it is disabled
]:
if except_clause in str_line:
is_exception = True
Expand Down Expand Up @@ -374,3 +402,18 @@ def copy_replace(src, dst, pattern=None, replace_value=None):
file1.close()
if isinstance(dst, str):
file2.close()

@staticmethod
def get_test_targets() -> List[str]:
"""
Returns a list of destinations to run tests on.

if the environment variable NORMALIZATION_TEST_TARGET is set with a comma separated list of destination names,
then the tests are run only on that subsets of destinations
Otherwise tests are run against all destinations
"""
if os.getenv(NORMALISATION_TEST_TARGET):
target_str = os.getenv(NORMALISATION_TEST_TARGET)
return [d.value for d in {DestinationType.from_string(s) for s in target_str.split(",")}]
else:
return [d.value for d in DestinationType]
Loading