Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@


def create_asset(*, path: str, extra=None) -> Asset:
# We assume that we get absolute path starting with /
if path.startswith("file://"):
path = path[len("file://") :]
return Asset(uri=f"file://{path}", extra=extra)


Expand All @@ -52,6 +53,13 @@ def convert_asset_to_openlineage(asset: Asset, lineage_context) -> OpenLineageDa
from airflow.providers.common.compat.openlineage.facet import Dataset as OpenLineageDataset

parsed = urllib.parse.urlsplit(asset.uri)

# Non-remote path
if parsed.path == "/":
netloc = parsed.netloc
if not netloc.startswith("/"):
netloc = "/" + netloc
return OpenLineageDataset(namespace="file", name=netloc)
return OpenLineageDataset(
namespace=f"file://{parsed.netloc}" if parsed.netloc else "file", name=parsed.path
)
23 changes: 17 additions & 6 deletions providers/common/io/tests/unit/common/io/assets/test_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,21 +47,32 @@ def test_sanitize_uri_invalid(uri):
sanitize_uri(urlsplit(uri))


def test_file_asset():
assert create_asset(path="/asdf/fdsa") == Asset(uri="file:///asdf/fdsa")
@pytest.mark.parametrize(
("path", "uri"),
(
("/asdf/fdsa", "file:///asdf/fdsa"),
("file:///asdf/fdsa", "file:///asdf/fdsa"),
("file://asdf/fdsa", "file://asdf/fdsa"),
("file://127.0.0.1:8080/dir/file.csv", "file://127.0.0.1:8080/dir/file.csv"),
),
)
def test_file_asset(path, uri):
assert create_asset(path=path) == Asset(uri=uri)


@pytest.mark.parametrize(
("uri", "ol_dataset"),
("path", "ol_dataset"),
(
("file:///valid/path", OpenLineageDataset(namespace="file", name="/valid/path")),
("/valid/path", OpenLineageDataset(namespace="file", name="/valid/path")),
(
"file://127.0.0.1:8080/dir/file.csv",
OpenLineageDataset(namespace="file://127.0.0.1:8080", name="/dir/file.csv"),
),
("file:///C://dir/file", OpenLineageDataset(namespace="file", name="/C://dir/file")),
("file://asdf.pdf", OpenLineageDataset(namespace="file", name="/asdf.pdf")),
("file:///asdf.pdf", OpenLineageDataset(namespace="file", name="/asdf.pdf")),
),
)
def test_convert_asset_to_openlineage(uri, ol_dataset):
result = convert_asset_to_openlineage(Asset(uri=uri), None)
def test_convert_asset_to_openlineage(path, ol_dataset):
result = convert_asset_to_openlineage(create_asset(path=path), None)
assert result == ol_dataset