Skip to content

Commit

Permalink
Refactor parts in file and url sources (#310)
Browse files Browse the repository at this point in the history
  • Loading branch information
sandorkertesz authored Feb 20, 2024
1 parent 9cc58e5 commit 1bb4c44
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 129 deletions.
80 changes: 18 additions & 62 deletions earthkit/data/sources/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,62 +16,15 @@
from earthkit.data.core.caching import CACHE
from earthkit.data.decorators import detect_out_filename
from earthkit.data.readers import reader
from earthkit.data.utils.parts import check_urls_and_parts, ensure_urls_and_parts
from earthkit.data.utils.parts import PathAndParts

from . import Source

LOG = logging.getLogger(__name__)


class FileParts:
def __init__(self, path, parts):
self.path, self.parts = self._paths_and_parts(path, parts)

def is_empty(self):
return not (self.parts is not None and any(x is not None for x in self.parts))

def update(self, path):
if self.path != path:
self.path, self.parts = self._paths_and_parts(path, self.parts)
return self.path

def _paths_and_parts(self, paths, parts):
"""Preprocess paths and parts.
Parameters
----------
paths: str or list/tuple
The path(s). When it is a sequence either each
item is a path (str), or a pair of a path and :ref:`parts <parts>`.
parts: part,list/tuple of parts or None.
The :ref:`parts <parts>`.
Returns
-------
str or list of str
The path or paths.
SimplePart, list or tuple, None
The parts (one for each path). A part can be a single
SimplePart, a list/tuple of SimpleParts or None.
"""
if parts is None:
if isinstance(paths, str):
return paths, None
elif isinstance(paths, (list, tuple)) and all(
isinstance(p, str) for p in paths
):
return paths, [None] * len(paths)

paths = check_urls_and_parts(paths, parts)
paths_and_parts = ensure_urls_and_parts(paths, parts, compress=True)

paths, parts = zip(*paths_and_parts)
assert len(paths) == len(parts)
if len(paths) == 1:
return paths[0], parts[0]
else:
return paths, parts
class FileSourcePathAndParts(PathAndParts):
compress = False


class FileSourceMeta(type(Source), type(os.PathLike)):
Expand All @@ -90,18 +43,13 @@ def __init__(self, path=None, filter=None, merger=None, parts=None, **kwargs):
Source.__init__(self, **kwargs)
self.filter = filter
self.merger = merger
self._parts = FileParts(path, parts)
self.path = self._parts.path
self._path_and_parts = FileSourcePathAndParts(path, parts)

if self._kwargs.get("indexing", False):
if not self._parts.is_empty():
if not self._path_and_parts.is_empty():
raise ValueError("Cannot specify parts when indexing is enabled!")

def mutate(self):
# the initial path is reset for e.g. the retrievals. We have to ensure
# the parts are still correctly formed
self.check_parts()

if isinstance(self.path, (list, tuple)):
if len(self.path) == 1:
self.path = self.path[0]
Expand All @@ -110,7 +58,7 @@ def mutate(self):
"multi",
[
from_source("file", p, parts=part, **self._kwargs)
for p, part in zip(self.path, self._parts.parts)
for p, part in zip(self.path, self.parts)
],
filter=self.filter,
merger=self.merger,
Expand Down Expand Up @@ -145,9 +93,8 @@ def merge(cls, sources):
@property
def _reader(self):
if self._reader_ is None:
self.check_parts()
self._reader_ = reader(
self, self.path, content_type=self.content_type, parts=self._parts.parts
self, self.path, content_type=self.content_type, parts=self.parts
)
return self._reader_

Expand Down Expand Up @@ -246,8 +193,17 @@ def bounding_box(self):
def statistics(self, **kwargs):
return self._reader.statistics(**kwargs)

def check_parts(self):
self.path = self._parts.update(self.path)
@property
def path(self):
return self._path_and_parts.path

@path.setter
def path(self, v):
self._path_and_parts.update(v)

@property
def parts(self):
return self._path_and_parts.parts


class IndexedFileSource(FileSource):
Expand Down
72 changes: 21 additions & 51 deletions earthkit/data/sources/url.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from earthkit.data.core.settings import SETTINGS
from earthkit.data.core.statistics import record_statistics
from earthkit.data.utils import progress_bar
from earthkit.data.utils.parts import check_urls_and_parts, ensure_urls_and_parts
from earthkit.data.utils.parts import PathAndParts

from .file import FileSource

Expand Down Expand Up @@ -108,6 +108,10 @@ def download(target, _):
return path


class UrlSourcePathAndParts(PathAndParts):
compress = True


class UrlBase(FileSource):
def __init__(
self,
Expand All @@ -126,9 +130,8 @@ def __init__(
):
super().__init__(filter=filter, merger=merger)

self.url = url
self._url_and_parts = UrlSourcePathAndParts(url, parts)
self.chunk_size = chunk_size
self.parts = parts
self.http_headers = http_headers
self.auth = auth
self.verify = verify
Expand All @@ -137,11 +140,11 @@ def __init__(
self.stream = stream
self._kwargs = kwargs
LOG.debug(
f"url={self.url} parts={self.parts} auth={self.auth} _kwargs={self._kwargs}"
f"url={self.url} url_parts={self.url_parts} auth={self.auth} _kwargs={self._kwargs}"
)

def connect_to_mirror(self, mirror):
return mirror.connection_for_url(self, self.url, self.parts)
return mirror.connection_for_url(self, self.url, self.url_parts)

def prepare_headers(self, url):
headers = {}
Expand All @@ -156,6 +159,14 @@ def prepare_headers(self, url):

return headers

@property
def url(self):
return self._url_and_parts.path

@property
def url_parts(self):
return self._url_and_parts.parts

def __repr__(self) -> str:
return f"{self.__class__.__name__}({self.url})"

Expand Down Expand Up @@ -191,21 +202,18 @@ def __init__(
# TODO: re-enable this feature
extension = None

# put urls and parts into one list
self.urls_and_parts = self._urls_and_parts(self.url, self.parts)

if not self.stream:
self.update_if_out_of_date = update_if_out_of_date

LOG.debug(
(
f"urls_and_parts={self.urls_and_parts} auth={self.auth}) "
f"url={self.url} url_parts={self.url_parts} auth={self.auth}) "
f"http_headers={self.http_headers}"
f" _kwargs={self._kwargs}"
)
)
self.downloader = Downloader(
self.urls_and_parts,
self._url_and_parts.zipped(),
chunk_size=self.chunk_size,
timeout=SETTINGS.get("url-download-timeout"),
verify=self.verify,
Expand All @@ -219,8 +227,6 @@ def __init__(
download_file_extension=".download",
)

print(f"downloader={self.downloader}")

if extension and extension[0] != ".":
extension = "." + extension

Expand All @@ -240,7 +246,7 @@ def download(target, _):

self.path = self.cache_file(
download,
dict(url=self.urls_and_parts),
dict(url=self.url, parts=self.url_parts),
extension=extension,
force=force,
)
Expand All @@ -254,7 +260,7 @@ def mutate(self):
if self.stream:
# # create one stream source per url
s = []
for url, parts in self.urls_and_parts:
for url, parts in self._url_and_parts:
s.append(
SingleUrlStream(
url,
Expand All @@ -271,44 +277,8 @@ def mutate(self):

return _from_source(s, **self._kwargs)
else:
# the underlying file source also try to use parts! So
# it has to be cleared to avoid using it again on the
# downloaded file!
self.parts = None
return super().mutate()

@staticmethod
def _urls_and_parts(urls, parts):
"""Preprocess urls and parts.
Parameters
----------
urls: str or list/tuple
The url(s). When it is a sequence either each
item is a url (str), or a pair of a url and :ref:`parts <parts>`.
parts: part,list/tuple of parts or None.
The :ref:`parts <parts>`.
Returns
-------
list
Each item is a pair of url and part in a format expected by
``multiurl``. A part can be a
SimplePart, list/tuple of SimpleParts or None.
"""
if parts is None:
if isinstance(urls, str):
return [[urls, None]]
elif isinstance(urls, (list, tuple)) and all(
isinstance(p, str) for p in urls
):
return [(u, None) for u in urls]

urls = check_urls_and_parts(urls, parts)
urls_and_parts = ensure_urls_and_parts(urls, parts, compress=False)
return urls_and_parts

def out_of_date(self, url, path, cache_data):
if SETTINGS.get("check-out-of-date-urls") is False:
return False
Expand Down Expand Up @@ -473,7 +443,7 @@ def to_stream(self):
downloader = Downloader(
self.url,
chunk_size=self.chunk_size,
parts=self.parts,
parts=self.url_parts,
timeout=SETTINGS.get("url-download-timeout"),
verify=self.verify,
range_method=self.range_method,
Expand Down
65 changes: 65 additions & 0 deletions earthkit/data/utils/parts.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ def check_urls_and_parts(urls, parts):
def _ensure_parts(parts):
if parts is None:
return None
if parts == [None]:
return None
if len(parts) == 2 and isinstance(parts[0], int) and isinstance(parts[1], int):
parts = [parts]
parts = [SimplePart(offset, length) for offset, length in parts]
Expand Down Expand Up @@ -149,3 +151,66 @@ def compress_parts(parts):
result.append((offset, length))
last = offset + length
return tuple(SimplePart(offset, length) for offset, length in result)


class PathAndParts:
compress = None

def __init__(self, path, parts):
self.path, self.parts = self._parse(path, parts)

def is_empty(self):
return not (self.parts is not None and any(x is not None for x in self.parts))

def update(self, path):
if self.path != path:
self.path, self.parts = self._parse(path, self.parts)

def _parse(self, paths, parts):
"""Preprocess paths and parts.
Parameters
----------
paths: str or list/tuple
The path(s). When it is a sequence either each
item is a path (str), or a pair of a path and :ref:`parts <parts>`.
parts: part,list/tuple of parts or None.
The :ref:`parts <parts>`.
Returns
-------
str or list of str
The path or paths.
SimplePart, list or tuple, None
The parts (one for each path). A part can be a single
SimplePart, a list/tuple of SimpleParts or None.
"""
if parts is None:
if isinstance(paths, str):
return paths, None
elif isinstance(paths, (list, tuple)) and all(
isinstance(p, str) for p in paths
):
return paths, [None] * len(paths)

paths = check_urls_and_parts(paths, parts)
paths_and_parts = ensure_urls_and_parts(paths, parts, compress=self.compress)

paths, parts = zip(*paths_and_parts)
assert len(paths) == len(parts)
if len(paths) == 1:
return paths[0], parts[0]
else:
return paths, parts

def zipped(self):
return [(pt, pr) for pt, pr in self]

def __iter__(self):
path = self.path
parts = self.parts
if isinstance(self.path, str):
path = [self.path]
parts = [self.parts]
return zip(path, parts)
2 changes: 1 addition & 1 deletion tests/grib/test_grib_url_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ def test_grib_single_url_stream_parts(path, parts, expected_meta):
),
],
)
def test_grib_single_url_stream_parts_as_arg(parts, expected_meta):
def test_grib_single_url_stream_parts_as_arg_valid(parts, expected_meta):
ds = from_source(
"url",
[earthkit_remote_test_data_file("examples/test6.grib"), parts],
Expand Down
Loading

0 comments on commit 1bb4c44

Please sign in to comment.