diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 2ba2b18ff..701552f56 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -3,6 +3,7 @@ import json import os import tempfile +import urllib.parse import warnings from abc import ABC, abstractmethod from functools import cached_property @@ -449,7 +450,7 @@ def get_datasets(self, source: Literal["inputs", "outputs"]) -> list[Dataset]: uris = [] for completed in self.openlineage_events_completes: for output in getattr(completed, source): - dataset_uri = output.namespace + "/" + output.name + dataset_uri = output.namespace + "/" + urllib.parse.quote(output.name) uris.append(dataset_uri) self.log.debug("URIs to be converted to Dataset: %s", uris) diff --git "a/dev/dags/dbt/simple/models/\357\275\215\357\275\225\357\275\214\357\275\224\357\275\211\357\275\202\357\275\231\357\275\224\357\275\205.sql" "b/dev/dags/dbt/simple/models/\357\275\215\357\275\225\357\275\214\357\275\224\357\275\211\357\275\202\357\275\231\357\275\224\357\275\205.sql" new file mode 100644 index 000000000..b017c8d0c --- /dev/null +++ "b/dev/dags/dbt/simple/models/\357\275\215\357\275\225\357\275\214\357\275\224\357\275\211\357\275\202\357\275\231\357\275\224\357\275\205.sql" @@ -0,0 +1,2 @@ +select + 'TEST_FOR_MULTIBYTE_CHARCTERS' diff --git a/docs/configuration/render-config.rst b/docs/configuration/render-config.rst index 4b2535e07..068998de5 100644 --- a/docs/configuration/render-config.rst +++ b/docs/configuration/render-config.rst @@ -7,7 +7,7 @@ It does this by exposing a ``cosmos.config.RenderConfig`` class that you can use The ``RenderConfig`` class takes the following arguments: -- ``emit_datasets``: whether or not to emit Airflow datasets to be used for data-aware scheduling. Defaults to True. Depends on `additional dependencies `_. +- ``emit_datasets``: whether or not to emit Airflow datasets to be used for data-aware scheduling. Defaults to True. Depends on `additional dependencies `_. If a model in the project has a name containing multibyte characters, the dataset name will be URL-encoded. - ``test_behavior``: how to run tests. Defaults to running a model's tests immediately after the model is run. For more information, see the `Testing Behavior `_ section. - ``load_method``: how to load your dbt project. See `Parsing Methods `_ for more information. - ``select`` and ``exclude``: which models to include or exclude from your DAGs. See `Selecting & Excluding `_ for more information. diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index effd604fa..d54bbb5e1 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -489,6 +489,36 @@ def test_run_operator_dataset_emission_is_skipped(caplog): assert run_operator.outlets == [] +@pytest.mark.skipif( + version.parse(airflow_version) < version.parse("2.4") + or version.parse(airflow_version) in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS, + reason="Airflow DAG did not have datasets until the 2.4 release, inlets and outlets do not work by default in Airflow 2.9.0 and 2.9.1", +) +@pytest.mark.integration +def test_run_operator_dataset_url_encoded_names(caplog): + from airflow.datasets import Dataset + + with DAG("test-id-1", start_date=datetime(2022, 1, 1)) as dag: + run_operator = DbtRunLocalOperator( + profile_config=real_profile_config, + project_dir=Path(__file__).parent.parent.parent / "dev/dags/dbt/simple", + task_id="run", + dbt_cmd_flags=["--models", "multibyte"], + install_deps=True, + append_env=True, + ) + run_operator + + run_test_dag(dag) + + assert run_operator.outlets == [ + Dataset( + uri="postgres://0.0.0.0:5432/postgres.public.%EF%BD%8D%EF%BD%95%EF%BD%8C%EF%BD%94%EF%BD%89%EF%BD%82%EF%BD%99%EF%BD%94%EF%BD%85", + extra=None, + ) + ] + + @pytest.mark.integration def test_run_operator_caches_partial_parsing(caplog, tmp_path): caplog.set_level(logging.DEBUG)