From f0a7dbdcbbeac39f754669279518eab6063f4bf0 Mon Sep 17 00:00:00 2001 From: Maciej Obuchowski Date: Sat, 15 Mar 2025 20:55:24 +0100 Subject: [PATCH] convert non-absolute file path to prevent namespace explosion Signed-off-by: Maciej Obuchowski --- .../providers/common/io/assets/file.py | 10 +++++++- .../tests/unit/common/io/assets/test_file.py | 23 ++++++++++++++----- 2 files changed, 26 insertions(+), 7 deletions(-) diff --git a/providers/common/io/src/airflow/providers/common/io/assets/file.py b/providers/common/io/src/airflow/providers/common/io/assets/file.py index 73b2ef0c9534d..5498efea8ef97 100644 --- a/providers/common/io/src/airflow/providers/common/io/assets/file.py +++ b/providers/common/io/src/airflow/providers/common/io/assets/file.py @@ -33,7 +33,8 @@ def create_asset(*, path: str, extra=None) -> Asset: - # We assume that we get absolute path starting with / + if path.startswith("file://"): + path = path[len("file://") :] return Asset(uri=f"file://{path}", extra=extra) @@ -52,6 +53,13 @@ def convert_asset_to_openlineage(asset: Asset, lineage_context) -> OpenLineageDa from airflow.providers.common.compat.openlineage.facet import Dataset as OpenLineageDataset parsed = urllib.parse.urlsplit(asset.uri) + + # Non-remote path + if parsed.path == "/": + netloc = parsed.netloc + if not netloc.startswith("/"): + netloc = "/" + netloc + return OpenLineageDataset(namespace="file", name=netloc) return OpenLineageDataset( namespace=f"file://{parsed.netloc}" if parsed.netloc else "file", name=parsed.path ) diff --git a/providers/common/io/tests/unit/common/io/assets/test_file.py b/providers/common/io/tests/unit/common/io/assets/test_file.py index d2dc48d845e8e..a5e533191c0be 100644 --- a/providers/common/io/tests/unit/common/io/assets/test_file.py +++ b/providers/common/io/tests/unit/common/io/assets/test_file.py @@ -47,21 +47,32 @@ def test_sanitize_uri_invalid(uri): sanitize_uri(urlsplit(uri)) -def test_file_asset(): - assert create_asset(path="/asdf/fdsa") == Asset(uri="file:///asdf/fdsa") +@pytest.mark.parametrize( + ("path", "uri"), + ( + ("/asdf/fdsa", "file:///asdf/fdsa"), + ("file:///asdf/fdsa", "file:///asdf/fdsa"), + ("file://asdf/fdsa", "file://asdf/fdsa"), + ("file://127.0.0.1:8080/dir/file.csv", "file://127.0.0.1:8080/dir/file.csv"), + ), +) +def test_file_asset(path, uri): + assert create_asset(path=path) == Asset(uri=uri) @pytest.mark.parametrize( - ("uri", "ol_dataset"), + ("path", "ol_dataset"), ( - ("file:///valid/path", OpenLineageDataset(namespace="file", name="/valid/path")), + ("/valid/path", OpenLineageDataset(namespace="file", name="/valid/path")), ( "file://127.0.0.1:8080/dir/file.csv", OpenLineageDataset(namespace="file://127.0.0.1:8080", name="/dir/file.csv"), ), ("file:///C://dir/file", OpenLineageDataset(namespace="file", name="/C://dir/file")), + ("file://asdf.pdf", OpenLineageDataset(namespace="file", name="/asdf.pdf")), + ("file:///asdf.pdf", OpenLineageDataset(namespace="file", name="/asdf.pdf")), ), ) -def test_convert_asset_to_openlineage(uri, ol_dataset): - result = convert_asset_to_openlineage(Asset(uri=uri), None) +def test_convert_asset_to_openlineage(path, ol_dataset): + result = convert_asset_to_openlineage(create_asset(path=path), None) assert result == ol_dataset