Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added fixes for handling fetch_range extending beyond length of the file #247

Merged
merged 15 commits into from
Jun 14, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions adlfs/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -1709,7 +1709,10 @@ def __init__(
self.details = self.fs.info(self.path)
self.size = self.details["size"]
self.cache = caches[cache_type](
self.blocksize, self._fetch_range, self.size, **cache_options
blocksize = self.blocksize,
fetcher = self._fetch_range,
size = self.size,
**cache_options
)
self.metadata = sync(
self.loop, get_blob_metadata, self.container_client, self.blob
Expand Down Expand Up @@ -1769,20 +1772,23 @@ def connect_client(self):
f"Unable to fetch container_client with provided params for {e}!!"
)

async def _async_fetch_range(self, start: int, end: int, **kwargs):
async def _async_fetch_range(self, start: int, length: int = None, **kwargs):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is ultimately overriding _fetch_range in fsspec.spec.AbstractBufferedFile right? If so, I guess the function signature needs to be kept start and end here, since that is what fsspec assumes?

"""
Download a chunk of data specified by start and end
Download a chunk of data specified by start and length
hayesgb marked this conversation as resolved.
Show resolved Hide resolved

Parameters
----------
start: int
Start byte position to download blob from
end: int
End byte position to download blob from
length: int
Length to download
"""
if length is not None:
if start + length > self.size:
length = self.size - start
async with self.container_client:
stream = await self.container_client.download_blob(
blob=self.blob, offset=start, length=end
blob=self.blob, offset=start, length=length
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...and then this line instead simply becomes

Suggested change
blob=self.blob, offset=start, length=length
blob=self.blob, offset=start, length=end-start

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @anders-kiaer. I appreciate the feedback, and the confirmation that this issue resolves the challenge cited in #57.

I'll align the params in fetch_range to fsspec.AbstractBufferedFile, but I also want to account for the situation where end > self.size, and also for the eventuality that length is None, which is valid for the Azure SDK. Can you take a look at this branch and provide feedback on performance?

)
blob = await stream.readall()
return blob
Expand Down
49 changes: 49 additions & 0 deletions adlfs/tests/test_fetch_range.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import pytest

from adlfs import AzureBlobFileSystem, AzureBlobFile


URL = "http://127.0.0.1:10000"
ACCOUNT_NAME = "devstoreaccount1"
KEY = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==" # NOQA
CONN_STR = f"DefaultEndpointsProtocol=http;AccountName={ACCOUNT_NAME};AccountKey={KEY};BlobEndpoint={URL}/{ACCOUNT_NAME};" # NOQA


def test_fetch_entire_blob(storage):
fs = AzureBlobFileSystem(
account_name=storage.account_name, connection_string=CONN_STR,
)
blob = fs.open("data/top_file.txt")
assert len(blob._fetch_range(start=0, length=10)) == 10


def test_fetch_first_half(storage):
fs = AzureBlobFileSystem(
account_name=storage.account_name, connection_string=CONN_STR,
)
blob = fs.open("data/top_file.txt")
assert len(blob._fetch_range(start=0, length=5)) == 5


def test_fetch_second_half(storage):
# Verify if length extends beyond the end of file, truncate the read
fs = AzureBlobFileSystem(
account_name=storage.account_name, connection_string=CONN_STR,
)
blob = fs.open("data/top_file.txt")
assert len(blob._fetch_range(start=5, length=10)) == 5


def test_fetch_middle(storage):
fs = AzureBlobFileSystem(
account_name=storage.account_name, connection_string=CONN_STR,
)
blob = fs.open("data/top_file.txt")
assert len(blob._fetch_range(start=2, length=7)) == 7

def test_fetch_length_is_none(storage):
fs = AzureBlobFileSystem(
account_name=storage.account_name, connection_string=CONN_STR,
)
blob = fs.open("data/top_file.txt")
assert len(blob._fetch_range(start=2, length=None)) == 8