Skip to content

Commit

Permalink
Add stream support for file source
Browse files Browse the repository at this point in the history
  • Loading branch information
sandorkertesz committed Oct 29, 2024
1 parent 5461777 commit 41e2b00
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 21 deletions.
2 changes: 1 addition & 1 deletion src/earthkit/data/readers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ def reader(source, path, **kwargs):
if os.path.isdir(path):
from .directory import DirectoryReader

return DirectoryReader(source, path, **kwargs).mutate()
return DirectoryReader(source, path).mutate()
LOG.debug("Reader for %s", path)

if not os.path.exists(path):
Expand Down
14 changes: 1 addition & 13 deletions src/earthkit/data/readers/directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def make_file_filter(filter, top):


class DirectoryReader(Reader):
def __init__(self, source, path, **kwargs):
def __init__(self, source, path):
super().__init__(source, path)
self._content = []

Expand Down Expand Up @@ -94,18 +94,6 @@ def write(self, f, **kwargs):
raise NotImplementedError()


# class DirectoryStreamReader(Reader):
# def __init__(self, source, content):
# super().__init__(source)
# self._content = content

# def mutate_source(self):
# if len(self._content) == 1:
# return from_source("file", self._content[0], stream=True, **self.source._kwargs)
# else:
# return from_source("file", sorted(self._content), stream=True, **self.source._kwargs)


def reader(source, path, *, magic=None, deeper_check=False, **kwargs):
if magic is None and os.path.isdir(path):
return DirectoryReader(source, path)
3 changes: 1 addition & 2 deletions src/earthkit/data/sources/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def mutate(self):
if len(self.path) == 1:
self.path = self.path[0]
else:
r = from_source(
return from_source(
"multi",
[
from_source("file", p, parts=part, filter=self.filter, **self._kwargs)
Expand All @@ -75,7 +75,6 @@ def mutate(self):
filter=self.filter,
merger=self.merger,
)
return r

# here we must have a file or a directory
if self._kwargs.get("indexing", False):
Expand Down
11 changes: 6 additions & 5 deletions src/earthkit/data/utils/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ def __init__(self, path, parts, chunk_size=1024 * 1024):
def __iter__(self):
yield from self._iter_chunks()

def __iter_full_parts(self):
with open(self.path, "rb") as f:
for offset, length in self.parts:
f.seek(offset)
yield f.read(length)
# TODO: use this method alternatively
# def __iter_full_parts(self):
# with open(self.path, "rb") as f:
# for offset, length in self.parts:
# f.seek(offset)
# yield f.read(length)

def _iter_chunks(self):
with open(self.path, "rb") as f:
Expand Down

0 comments on commit 41e2b00

Please sign in to comment.