Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resume downloads automatically #603

Merged
merged 9 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,14 @@
Release History
---------------

4.0.0 (2024-04-12)
++++++++++++++++++

**Features and Improvements**

- Partially downloaded files will now automatically resume where they left off when retried.
- Use ``Last-Modified`` header to set all mtimes (this includes files.xml now).

3.7.0 (2024-03-19)
++++++++++++++++++

Expand Down
2 changes: 1 addition & 1 deletion internetarchive/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '3.7.0'
__version__ = '4.0.0.dev1'
11 changes: 10 additions & 1 deletion internetarchive/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,16 @@ class AuthenticationError(Exception):

class ItemLocateError(Exception):
def __init__(self, *args, **kwargs):
default_message = 'Item cannot be located because it is dark or does not exist.'
default_message = "Item cannot be located because it is dark or does not exist."
if args or kwargs:
super().__init__(*args, **kwargs)
else:
super().__init__(default_message)


class InvalidChecksumError(Exception):
def __init__(self, *args, **kwargs):
default_message = "File corrupt, checksums do not match."
if args or kwargs:
super().__init__(*args, **kwargs)
else:
Expand Down
212 changes: 131 additions & 81 deletions internetarchive/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import sys
from contextlib import nullcontext, suppress
from email.utils import parsedate_to_datetime
from time import sleep
from urllib.parse import quote

from requests.exceptions import (
Expand All @@ -40,7 +41,7 @@
)
from tqdm import tqdm

from internetarchive import auth, iarequest, utils
from internetarchive import auth, exceptions, iarequest, utils

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -137,7 +138,8 @@ def __repr__(self):
f'size={self.size!r}, '
f'format={self.format!r})')

def download(self, file_path=None, verbose=None, ignore_existing=None,
def download(# noqa: max-complexity=38
self, file_path=None, verbose=None, ignore_existing=None,
checksum=None, destdir=None, retries=None, ignore_errors=None,
fileobj=None, return_responses=None, no_change_timestamp=None,
params=None, chunk_size=None, stdout=None, ors=None,
Expand Down Expand Up @@ -205,6 +207,9 @@ def download(self, file_path=None, verbose=None, ignore_existing=None,
no_change_timestamp = no_change_timestamp or False
params = params or None
timeout = 12 if not timeout else timeout
headers = {}
retries_sleep = 3 # TODO: exponential sleep
retrying = False # for retry loop

self.item.session.mount_http_adapter(max_retries=retries)
file_path = file_path or self.name
Expand All @@ -220,93 +225,138 @@ def download(self, file_path=None, verbose=None, ignore_existing=None,
file_path = os.path.join(destdir, file_path)

parent_dir = os.path.dirname(file_path)
try:
if parent_dir != '' and return_responses is not True:
os.makedirs(parent_dir, exist_ok=True)

response = self.item.session.get(self.url,
stream=True,
timeout=timeout,
auth=self.auth,
params=params)

# Get timestamp from Last-Modified header
dt = parsedate_to_datetime(response.headers['Last-Modified'])
last_mod_mtime = dt.timestamp()

response.raise_for_status()

# Check if we should skip...
if not return_responses and os.path.exists(file_path.encode('utf-8')):
if ignore_existing:
msg = f'skipping {file_path}, file already exists.'
log.info(msg)
if verbose:
print(f' {msg}', file=sys.stderr)
return
elif checksum:
with open(file_path, 'rb') as fp:
md5_sum = utils.get_md5(fp)

if md5_sum == self.md5:
msg = f'skipping {file_path}, file already exists based on checksum.'

# Retry loop
while True:
try:
if parent_dir != '' and return_responses is not True:
os.makedirs(parent_dir, exist_ok=True)

if not return_responses \
and not ignore_existing \
and self.name != f'{self.identifier}_files.xml' \
and os.path.exists(file_path.encode('utf-8')):
st = os.stat(file_path.encode('utf-8'))
if st.st_size != self.size and not checksum:
headers = {"Range": f"bytes={st.st_size}-{self.size}"}

response = self.item.session.get(self.url,
stream=True,
timeout=timeout,
auth=self.auth,
params=params,
headers=headers)

# Get timestamp from Last-Modified header
last_mod_header = response.headers.get('Last-Modified')
if last_mod_header:
dt = parsedate_to_datetime(last_mod_header)
last_mod_mtime = dt.timestamp()
else:
last_mod_mtime = 0

response.raise_for_status()

# Check if we should skip...
if not return_responses and os.path.exists(file_path.encode('utf-8')):
if ignore_existing:
msg = f'skipping {file_path}, file already exists.'
log.info(msg)
if verbose:
print(f' {msg}', file=sys.stderr)
return
elif not fileobj:
st = os.stat(file_path.encode('utf-8'))
if st.st_mtime == last_mod_mtime:
if self.name == f'{self.identifier}_files.xml' \
or (st.st_size == self.size):
msg = (f'skipping {file_path}, file already exists based on '
'length and date.')
elif checksum:
with open(file_path, 'rb') as fp:
md5_sum = utils.get_md5(fp)

if md5_sum == self.md5:
msg = f'skipping {file_path}, file already exists based on checksum.'
log.info(msg)
if verbose:
print(f' {msg}', file=sys.stderr)
return

elif return_responses:
return response

if verbose:
total = int(response.headers.get('content-length', 0)) or None
progress_bar = tqdm(desc=f' downloading {self.name}',
total=total,
unit='iB',
unit_scale=True,
unit_divisor=1024)
else:
progress_bar = nullcontext()

if not chunk_size:
chunk_size = 1048576
if stdout:
fileobj = os.fdopen(sys.stdout.fileno(), "wb", closefd=False)
if not fileobj:
fileobj = open(file_path.encode('utf-8'), 'wb')

with fileobj, progress_bar as bar:
for chunk in response.iter_content(chunk_size=chunk_size):
if chunk:
size = fileobj.write(chunk)
if bar is not None:
bar.update(size)
if ors:
fileobj.write(os.environ.get("ORS", "\n").encode("utf-8"))
except (RetryError, HTTPError, ConnectTimeout, OSError, ReadTimeout) as exc:
msg = f'error downloading file {file_path}, exception raised: {exc}'
log.error(msg)
try:
os.remove(file_path)
except OSError:
pass
if verbose:
print(f' {msg}', file=sys.stderr)
if ignore_errors:
return False
else:
raise exc
elif not fileobj:
st = os.stat(file_path.encode('utf-8'))
if st.st_mtime == last_mod_mtime:
if self.name == f'{self.identifier}_files.xml' \
or (st.st_size == self.size):
msg = (f'skipping {file_path}, file already exists based on '
'length and date.')
log.info(msg)
if verbose:
print(f' {msg}', file=sys.stderr)
return

elif return_responses:
return response



if verbose:
total = int(response.headers.get('content-length', 0)) or None
progress_bar = tqdm(desc=f' downloading {self.name}',
total=total,
unit='iB',
unit_scale=True,
unit_divisor=1024)
else:
progress_bar = nullcontext()

if not chunk_size:
chunk_size = 1048576
if stdout:
fileobj = os.fdopen(sys.stdout.fileno(), 'wb', closefd=False)
if not fileobj or retrying:
if 'Range' in headers:
fileobj = open(file_path.encode('utf-8'), 'ab')
else:
fileobj = open(file_path.encode('utf-8'), 'wb')

with fileobj, progress_bar as bar:
for chunk in response.iter_content(chunk_size=chunk_size):
if chunk:
size = fileobj.write(chunk)
if bar is not None:
bar.update(size)
if ors:
fileobj.write(os.environ.get("ORS", "\n").encode("utf-8"))

if 'Range' in headers:
with open(file_path, 'rb') as fh:
local_checksum = utils.get_md5(fh)
try:
assert local_checksum == self.md5
except AssertionError:
msg = (f"\"{file_path}\" corrupt, "
"checksums do not match. "
"Remote file may have been modified, "
"retry download.")
os.remove(file_path.encode('utf-8'))
raise exceptions.InvalidChecksumError(msg)
break
except (RetryError, HTTPError, ConnectTimeout, OSError, ReadTimeout,
exceptions.InvalidChecksumError) as exc:
if retries > 0:
retrying = True
retries -= 1
msg = ('download failed, sleeping for '
f'{retries_sleep} seconds and retrying. '
f'{retries} retries left.')
log.warning(msg)
sleep(retries_sleep)
continue
msg = f'error downloading file {file_path}, exception raised: {exc}'
log.error(msg)
try:
os.remove(file_path)
except OSError:
pass
if verbose:
print(f' {msg}', file=sys.stderr)
if ignore_errors:
return False
else:
raise exc

# Set mtime with timestamp from Last-Modified header
if not no_change_timestamp:
Expand Down
27 changes: 8 additions & 19 deletions tests/test_item.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import internetarchive.files
from internetarchive import get_session
from internetarchive.api import get_item
from internetarchive.exceptions import InvalidChecksumError
from internetarchive.utils import InvalidIdentifierException, json, norm_filepath
from tests.conftest import (
NASA_METADATA_PATH,
Expand Down Expand Up @@ -151,6 +152,7 @@ def test_download(tmpdir, nasa_item):
adding_headers=EXPECTED_LAST_MOD_HEADER)
nasa_item.download(files='nasa_meta.xml')
assert len(tmpdir.listdir()) == 1
os.remove('nasa/nasa_meta.xml')
with IaRequestsMock() as rsps:
rsps.add(responses.GET, DOWNLOAD_URL_RE,
body='new test content',
Expand Down Expand Up @@ -198,26 +200,12 @@ def test_download_ignore_existing(tmpdir, nasa_item):
assert fh.read() == 'test content'


def test_download_clobber(tmpdir, nasa_item):
tmpdir.chdir()
with IaRequestsMock() as rsps:
rsps.add(responses.GET, DOWNLOAD_URL_RE,
body='test content',
adding_headers=EXPECTED_LAST_MOD_HEADER)
nasa_item.download(files='nasa_meta.xml')

rsps.reset()
rsps.add(responses.GET, DOWNLOAD_URL_RE,
body='new test content',
adding_headers=EXPECTED_LAST_MOD_HEADER)
nasa_item.download(files='nasa_meta.xml')
assert load_file('nasa/nasa_meta.xml') == 'new test content'


def test_download_checksum(tmpdir, caplog):
tmpdir.chdir()

# test overwrite based on checksum.
if os.path.exists('nasa/nasa_meta.xml'):
os.remove('nasa/nasa_meta.xml')
with IaRequestsMock() as rsps:
rsps.add_metadata_mock('nasa')
rsps.add(responses.GET, DOWNLOAD_URL_RE,
Expand All @@ -229,9 +217,10 @@ def test_download_checksum(tmpdir, caplog):

nasa_item = get_item('nasa')
nasa_item.download(files='nasa_meta.xml')
nasa_item.download(files='nasa_meta.xml', checksum=True)

assert load_file('nasa/nasa_meta.xml') == 'overwrite based on md5'
try:
nasa_item.download(files='nasa_meta.xml', checksum=True)
except InvalidChecksumError as exc:
assert "corrupt, checksums do not match." in str(exc)

# test no overwrite based on checksum.
with caplog.at_level(logging.DEBUG):
Expand Down