diff --git a/task-sdk/src/airflow/sdk/definitions/asset/__init__.py b/task-sdk/src/airflow/sdk/definitions/asset/__init__.py index 98a9e270c62a5..970e0385fd834 100644 --- a/task-sdk/src/airflow/sdk/definitions/asset/__init__.py +++ b/task-sdk/src/airflow/sdk/definitions/asset/__init__.py @@ -166,15 +166,16 @@ def _sanitize_uri(inp: str | ObjectStoragePath) -> str: return uri if normalized_scheme == "airflow": raise ValueError("Asset scheme 'airflow' is reserved") - _, auth_exists, normalized_netloc = parsed.netloc.rpartition("@") - if auth_exists: + if parsed.password: # TODO: Collect this into a DagWarning. warnings.warn( - "An Asset URI should not contain auth info (e.g. username or " - "password). It has been automatically dropped.", + "An Asset URI should not contain a password. User info has been automatically dropped.", UserWarning, stacklevel=3, ) + _, _, normalized_netloc = parsed.netloc.rpartition("@") + else: + normalized_netloc = parsed.netloc if parsed.query: normalized_query = urllib.parse.urlencode(sorted(urllib.parse.parse_qsl(parsed.query))) else: diff --git a/task-sdk/tests/task_sdk/definitions/test_asset.py b/task-sdk/tests/task_sdk/definitions/test_asset.py index e9be2d297dee3..9348bd26d9415 100644 --- a/task-sdk/tests/task_sdk/definitions/test_asset.py +++ b/task-sdk/tests/task_sdk/definitions/test_asset.py @@ -144,19 +144,24 @@ def test_uri_with_scheme(uri: str, normalized: str) -> None: assert os.fspath(asset) == normalized -def test_uri_with_auth() -> None: - with pytest.warns(UserWarning, match="username") as record: - asset = Asset("ftp://user@localhost/foo.txt") +def test_uri_with_password() -> None: + with pytest.warns(UserWarning, match="password") as record: + asset = Asset("ftp://user:password@localhost/foo.txt") assert len(record) == 1 assert str(record[0].message) == ( - "An Asset URI should not contain auth info (e.g. username or " - "password). It has been automatically dropped." + "An Asset URI should not contain a password. User info has been automatically dropped." ) EmptyOperator(task_id="task1", outlets=[asset]) assert asset.uri == "ftp://localhost/foo.txt" assert os.fspath(asset) == "ftp://localhost/foo.txt" +def test_uri_without_password() -> None: + uri = "abfss://filesystem@account.dfs.core.windows.net/path" + asset = Asset(uri) + assert asset.uri == uri + + def test_uri_without_scheme(): asset = Asset(uri="example_asset") EmptyOperator(task_id="task1", outlets=[asset])