Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions task-sdk/src/airflow/sdk/definitions/asset/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,15 +166,16 @@ def _sanitize_uri(inp: str | ObjectStoragePath) -> str:
return uri
if normalized_scheme == "airflow":
raise ValueError("Asset scheme 'airflow' is reserved")
_, auth_exists, normalized_netloc = parsed.netloc.rpartition("@")
if auth_exists:
if parsed.password:
# TODO: Collect this into a DagWarning.
warnings.warn(
"An Asset URI should not contain auth info (e.g. username or "
"password). It has been automatically dropped.",
"An Asset URI should not contain a password. User info has been automatically dropped.",
UserWarning,
stacklevel=3,
)
_, _, normalized_netloc = parsed.netloc.rpartition("@")
else:
normalized_netloc = parsed.netloc
if parsed.query:
normalized_query = urllib.parse.urlencode(sorted(urllib.parse.parse_qsl(parsed.query)))
else:
Expand Down
15 changes: 10 additions & 5 deletions task-sdk/tests/task_sdk/definitions/test_asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,19 +144,24 @@ def test_uri_with_scheme(uri: str, normalized: str) -> None:
assert os.fspath(asset) == normalized


def test_uri_with_auth() -> None:
with pytest.warns(UserWarning, match="username") as record:
asset = Asset("ftp://user@localhost/foo.txt")
def test_uri_with_password() -> None:
with pytest.warns(UserWarning, match="password") as record:
asset = Asset("ftp://user:password@localhost/foo.txt")
assert len(record) == 1
assert str(record[0].message) == (
"An Asset URI should not contain auth info (e.g. username or "
"password). It has been automatically dropped."
"An Asset URI should not contain a password. User info has been automatically dropped."
)
EmptyOperator(task_id="task1", outlets=[asset])
assert asset.uri == "ftp://localhost/foo.txt"
assert os.fspath(asset) == "ftp://localhost/foo.txt"


def test_uri_without_password() -> None:
uri = "abfss://filesystem@account.dfs.core.windows.net/path"
asset = Asset(uri)
assert asset.uri == uri


def test_uri_without_scheme():
asset = Asset(uri="example_asset")
EmptyOperator(task_id="task1", outlets=[asset])
Expand Down