Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -175,53 +175,68 @@ def _extract_openlineage_unique_dataset_paths(

>>> results = [{"file": "azure://my_account.blob.core.windows.net/azure_container/dir3/file.csv"}]
>>> method(results)
([('wasbs://azure_container@my_account', 'dir3')], [])
([('wasbs://azure_container@my_account', 'dir3/file.csv')], [])

>>> results = [{"file": "azure://my_account.blob.core.windows.net/azure_container"}]
>>> method(results)
([('wasbs://azure_container@my_account', '/')], [])

>>> results = [{"file": "s3://bucket"}, {"file": "gcs://bucket/"}, {"file": "s3://bucket/a.csv"}]
>>> method(results)
([('gcs://bucket', '/'), ('s3://bucket', '/')], [])
([('gcs://bucket', '/'), ('s3://bucket', '/'), ('s3://bucket', 'a.csv')], [])

>>> results = [{"file": "s3://bucket/dir/file.csv"}, {"file": "gcs://bucket/dir/dir2/a.txt"}]
>>> method(results)
([('gcs://bucket', 'dir/dir2'), ('s3://bucket', 'dir')], [])
([('gcs://bucket', 'dir/dir2/a.txt'), ('s3://bucket', 'dir/file.csv')], [])

>>> results = [
... {"file": "s3://bucket/dir/file.csv"},
... {"file": "azure://my_account.something_new.windows.net/azure_container"},
... ]
>>> method(results)
([('s3://bucket', 'dir')], ['azure://my_account.something_new.windows.net/azure_container'])
([('s3://bucket', 'dir/file.csv')], ['azure://my_account.something_new.windows.net/azure_container'])

>>> results = [
... {"file": "s3://bucket/dir/file.csv"},
... {"file": "s3:/invalid-s3-uri"},
... {"file": "gcs:invalid-gcs-uri"},
... ]
>>> method(results)
([('s3://bucket', 'dir/file.csv')], ['gcs:invalid-gcs-uri', 's3:/invalid-s3-uri'])
"""
import re
from pathlib import Path
from urllib.parse import urlparse

azure_regex = r"azure:\/\/(\w+)?\.blob.core.windows.net\/(\w+)\/?(.*)?"
extraction_error_files = []
unique_dataset_paths = set()

for row in query_result:
uri = urlparse(row["file"])
if uri.scheme == "azure":
match = re.fullmatch(azure_regex, row["file"])
if not match:
try:
uri = urlparse(row["file"])

# Check for valid URI structure
if not uri.scheme or not uri.netloc:
extraction_error_files.append(row["file"])
continue
account_name, container_name, name = match.groups()
namespace = f"wasbs://{container_name}@{account_name}"
else:
namespace = f"{uri.scheme}://{uri.netloc}"
name = uri.path.lstrip("/")

name = Path(name).parent.as_posix()
if name in ("", "."):
name = "/"
if uri.scheme == "azure":
match = re.fullmatch(azure_regex, row["file"])
if not match:
extraction_error_files.append(row["file"])
continue
account_name, container_name, name = match.groups()
namespace = f"wasbs://{container_name}@{account_name}"
else:
namespace = f"{uri.scheme}://{uri.netloc}"
name = uri.path.lstrip("/")

if name in ("", "."):
name = "/"

unique_dataset_paths.add((namespace, name))
unique_dataset_paths.add((namespace, name))
except Exception:
extraction_error_files.append(row["file"])

return sorted(unique_dataset_paths), sorted(extraction_error_files)

Expand All @@ -243,9 +258,11 @@ def get_openlineage_facets_on_complete(self, task_instance):
return OperatorLineage()

query_results = self._result or []
# If no files were uploaded we get [{"status": "0 files were uploaded..."}]
if len(query_results) == 1 and query_results[0].get("status"):

# This typically happens when no files were processed (empty directory)
if len(query_results) == 1 and ("file" not in query_results[0] or query_results[0]["file"] is None):
query_results = []

unique_dataset_paths, extraction_error_files = self._extract_openlineage_unique_dataset_paths(
query_results
)
Expand Down
Loading