Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): add and use file system abstraction in file source #8415

Merged
merged 17 commits into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,11 @@
"file = datahub.ingestion.reporting.file_reporter:FileReporter",
],
"datahub.custom_packages": [],
"datahub.fs.plugins": [
"s3 = datahub.ingestion.fs.s3_fs:S3FileSystem",
"file = datahub.ingestion.fs.local_fs:LocalFileSystem",
"http = datahub.ingestion.fs.http_fs:HttpFileSystem",
],
}


Expand Down
Empty file.
40 changes: 40 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/fs/fs_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from abc import ABCMeta, abstractmethod
from dataclasses import dataclass
from typing import Any, Iterable
from urllib import parse


@dataclass
class FileInfo:
path: str
size: int
is_file: bool

def __str__(self):
return f"FileInfo({self.path}, {self.size}, {self.is_file})"


class FileSystem(metaclass=ABCMeta):
@classmethod
def create(cls, **kwargs: Any) -> "FileSystem":
raise NotImplementedError('File system implementations must implement "create"')

@abstractmethod
def open(self, path: str, **kwargs: Any) -> Any:
pass

@abstractmethod
def file_status(self, path: str) -> FileInfo:
pass

@abstractmethod
def list(self, path: str) -> Iterable[FileInfo]:
pass


def get_path_schema(path: str) -> str:
scheme = parse.urlparse(path).scheme
if scheme == "":
# This makes the default schema "file" for local paths.
scheme = "file"
return scheme
5 changes: 5 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/fs/fs_registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from datahub.ingestion.api.registry import PluginRegistry
from datahub.ingestion.fs.fs_base import FileSystem

fs_registry = PluginRegistry[FileSystem]()
fs_registry.register_from_entrypoint("datahub.fs.plugins")
28 changes: 28 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/fs/http_fs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from typing import Any, Iterable

import requests
import smart_open

from datahub.ingestion.fs.fs_base import FileInfo, FileSystem


class HttpFileSystem(FileSystem):
@classmethod
def create(cls, **kwargs):
return HttpFileSystem()

def open(self, path: str, **kwargs: Any) -> Any:
return smart_open.open(path, mode="rb", transport_params=kwargs)

def file_status(self, path: str) -> FileInfo:
head = requests.head(path)
if head.ok:
return FileInfo(path, int(head.headers["Content-length"]), is_file=True)
elif head.status_code == 404:
raise FileNotFoundError(f"Requested path {path} does not exist.")
else:
raise IOError(f"Cannot get file status for the requested path {path}.")

def list(self, path: str) -> Iterable[FileInfo]:
status = self.file_status(path)
return [status]
29 changes: 29 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/fs/local_fs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import os
import pathlib
from typing import Any, Iterable

from datahub.ingestion.fs.fs_base import FileInfo, FileSystem


class LocalFileSystem(FileSystem):
@classmethod
def create(cls, **kwargs):
return LocalFileSystem()

def open(self, path: str, **kwargs: Any) -> Any:
# Local does not support any additional kwargs
assert not kwargs
return pathlib.Path(path).open(mode="rb")
hsheth2 marked this conversation as resolved.
Show resolved Hide resolved

def list(self, path: str) -> Iterable[FileInfo]:
p = pathlib.Path(path)
if p.is_file():
return [self.file_status(path)]
else:
return iter([self.file_status(str(x)) for x in p.iterdir()])

def file_status(self, path: str) -> FileInfo:
if os.path.isfile(path):
return FileInfo(path, os.path.getsize(path), is_file=True)
else:
return FileInfo(path, 0, is_file=False)
108 changes: 108 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/fs/s3_fs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
from collections.abc import Iterator
from dataclasses import dataclass
from typing import Any, Iterable
from urllib.parse import urlparse

import boto3
import smart_open

from datahub.ingestion.fs import s3_fs
from datahub.ingestion.fs.fs_base import FileInfo, FileSystem


def parse_s3_path(path: str) -> "S3Path":
parsed = urlparse(path)
return S3Path(parsed.netloc, parsed.path.lstrip("/"))


def assert_ok_status(s3_response):
is_ok = s3_response["ResponseMetadata"]["HTTPStatusCode"] == 200
assert (
is_ok
), f"Failed to fetch S3 object, error message: {s3_response['Error']['Message']}"


@dataclass
class S3Path:
bucket: str
key: str

def __str__(self):
return f"S3Path({self.bucket}, {self.key})"


class S3ListIterator(Iterator):

MAX_KEYS = 1000

def __init__(
self, s3_client: Any, bucket: str, prefix: str, max_keys: int = MAX_KEYS
) -> None:
self._s3 = s3_client
self._bucket = bucket
self._prefix = prefix
self._max_keys = max_keys
self._file_statuses: Iterator = iter([])
self._token = ""
self.fetch()

def __next__(self) -> FileInfo:
try:
return next(self._file_statuses)
except StopIteration:
if self._token:
self.fetch()
return next(self._file_statuses)
else:
raise StopIteration()

def fetch(self):
params = dict(Bucket=self._bucket, Prefix=self._prefix, MaxKeys=self._max_keys)
if self._token:
params.update(ContinuationToken=self._token)

response = self._s3.list_objects_v2(**params)

s3_fs.assert_ok_status(response)

self._file_statuses = iter(
[
FileInfo(f"s3://{response['Name']}/{x['Key']}", x["Size"], is_file=True)
for x in response.get("Contents", [])
]
)
self._token = response.get("NextContinuationToken")


class S3FileSystem(FileSystem):
def __init__(self, **kwargs):
self.s3 = boto3.client("s3", **kwargs)

@classmethod
def create(cls, **kwargs):
return S3FileSystem(**kwargs)

def open(self, path: str, **kwargs: Any) -> Any:
transport_params = kwargs.update({"client": self.s3})
return smart_open.open(path, mode="rb", transport_params=transport_params)

def file_status(self, path: str) -> FileInfo:
s3_path = parse_s3_path(path)
try:
response = self.s3.get_object_attributes(
Bucket=s3_path.bucket, Key=s3_path.key, ObjectAttributes=["ObjectSize"]
)
assert_ok_status(response)
return FileInfo(path, response["ObjectSize"], is_file=True)
except Exception as e:
if (
hasattr(e, "response")
and e.response["ResponseMetadata"]["HTTPStatusCode"] == 404
):
return FileInfo(path, 0, is_file=False)
else:
raise e

def list(self, path: str) -> Iterable[FileInfo]:
s3_path = parse_s3_path(path)
return S3ListIterator(self.s3, s3_path.bucket, s3_path.key)
Loading
Loading