Skip to content

Commit

Permalink
Merge pull request #603 from jjjake/resume-download
Browse files Browse the repository at this point in the history
resume downloads automatically
  • Loading branch information
jjjake authored Apr 12, 2024
2 parents 8c6d9b1 + a61886a commit 6ba9bcf
Show file tree
Hide file tree
Showing 5 changed files with 158 additions and 102 deletions.
8 changes: 8 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,14 @@
Release History
---------------

4.0.0 (2024-04-12)
++++++++++++++++++

**Features and Improvements**

- Partially downloaded files will now automatically resume where they left off when retried.
- Use ``Last-Modified`` header to set all mtimes (this includes files.xml now).

3.7.0 (2024-03-19)
++++++++++++++++++

Expand Down
2 changes: 1 addition & 1 deletion internetarchive/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '3.7.0'
__version__ = '4.0.0.dev1'
11 changes: 10 additions & 1 deletion internetarchive/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,16 @@ class AuthenticationError(Exception):

class ItemLocateError(Exception):
def __init__(self, *args, **kwargs):
default_message = 'Item cannot be located because it is dark or does not exist.'
default_message = "Item cannot be located because it is dark or does not exist."
if args or kwargs:
super().__init__(*args, **kwargs)
else:
super().__init__(default_message)


class InvalidChecksumError(Exception):
def __init__(self, *args, **kwargs):
default_message = "File corrupt, checksums do not match."
if args or kwargs:
super().__init__(*args, **kwargs)
else:
Expand Down
212 changes: 131 additions & 81 deletions internetarchive/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import sys
from contextlib import nullcontext, suppress
from email.utils import parsedate_to_datetime
from time import sleep
from urllib.parse import quote

from requests.exceptions import (
Expand All @@ -40,7 +41,7 @@
)
from tqdm import tqdm

from internetarchive import auth, iarequest, utils
from internetarchive import auth, exceptions, iarequest, utils

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -137,7 +138,8 @@ def __repr__(self):
f'size={self.size!r}, '
f'format={self.format!r})')

def download(self, file_path=None, verbose=None, ignore_existing=None,
def download(# noqa: max-complexity=38
self, file_path=None, verbose=None, ignore_existing=None,
checksum=None, destdir=None, retries=None, ignore_errors=None,
fileobj=None, return_responses=None, no_change_timestamp=None,
params=None, chunk_size=None, stdout=None, ors=None,
Expand Down Expand Up @@ -205,6 +207,9 @@ def download(self, file_path=None, verbose=None, ignore_existing=None,
no_change_timestamp = no_change_timestamp or False
params = params or None
timeout = 12 if not timeout else timeout
headers = {}
retries_sleep = 3 # TODO: exponential sleep
retrying = False # for retry loop

self.item.session.mount_http_adapter(max_retries=retries)
file_path = file_path or self.name
Expand All @@ -220,93 +225,138 @@ def download(self, file_path=None, verbose=None, ignore_existing=None,
file_path = os.path.join(destdir, file_path)

parent_dir = os.path.dirname(file_path)
try:
if parent_dir != '' and return_responses is not True:
os.makedirs(parent_dir, exist_ok=True)

response = self.item.session.get(self.url,
stream=True,
timeout=timeout,
auth=self.auth,
params=params)

# Get timestamp from Last-Modified header
dt = parsedate_to_datetime(response.headers['Last-Modified'])
last_mod_mtime = dt.timestamp()

response.raise_for_status()

# Check if we should skip...
if not return_responses and os.path.exists(file_path.encode('utf-8')):
if ignore_existing:
msg = f'skipping {file_path}, file already exists.'
log.info(msg)
if verbose:
print(f' {msg}', file=sys.stderr)
return
elif checksum:
with open(file_path, 'rb') as fp:
md5_sum = utils.get_md5(fp)

if md5_sum == self.md5:
msg = f'skipping {file_path}, file already exists based on checksum.'

# Retry loop
while True:
try:
if parent_dir != '' and return_responses is not True:
os.makedirs(parent_dir, exist_ok=True)

if not return_responses \
and not ignore_existing \
and self.name != f'{self.identifier}_files.xml' \
and os.path.exists(file_path.encode('utf-8')):
st = os.stat(file_path.encode('utf-8'))
if st.st_size != self.size and not checksum:
headers = {"Range": f"bytes={st.st_size}-{self.size}"}

response = self.item.session.get(self.url,
stream=True,
timeout=timeout,
auth=self.auth,
params=params,
headers=headers)

# Get timestamp from Last-Modified header
last_mod_header = response.headers.get('Last-Modified')
if last_mod_header:
dt = parsedate_to_datetime(last_mod_header)
last_mod_mtime = dt.timestamp()
else:
last_mod_mtime = 0

response.raise_for_status()

# Check if we should skip...
if not return_responses and os.path.exists(file_path.encode('utf-8')):
if ignore_existing:
msg = f'skipping {file_path}, file already exists.'
log.info(msg)
if verbose:
print(f' {msg}', file=sys.stderr)
return
elif not fileobj:
st = os.stat(file_path.encode('utf-8'))
if st.st_mtime == last_mod_mtime:
if self.name == f'{self.identifier}_files.xml' \
or (st.st_size == self.size):
msg = (f'skipping {file_path}, file already exists based on '
'length and date.')
elif checksum:
with open(file_path, 'rb') as fp:
md5_sum = utils.get_md5(fp)

if md5_sum == self.md5:
msg = f'skipping {file_path}, file already exists based on checksum.'
log.info(msg)
if verbose:
print(f' {msg}', file=sys.stderr)
return

elif return_responses:
return response

if verbose:
total = int(response.headers.get('content-length', 0)) or None
progress_bar = tqdm(desc=f' downloading {self.name}',
total=total,
unit='iB',
unit_scale=True,
unit_divisor=1024)
else:
progress_bar = nullcontext()

if not chunk_size:
chunk_size = 1048576
if stdout:
fileobj = os.fdopen(sys.stdout.fileno(), "wb", closefd=False)
if not fileobj:
fileobj = open(file_path.encode('utf-8'), 'wb')

with fileobj, progress_bar as bar:
for chunk in response.iter_content(chunk_size=chunk_size):
if chunk:
size = fileobj.write(chunk)
if bar is not None:
bar.update(size)
if ors:
fileobj.write(os.environ.get("ORS", "\n").encode("utf-8"))
except (RetryError, HTTPError, ConnectTimeout, OSError, ReadTimeout) as exc:
msg = f'error downloading file {file_path}, exception raised: {exc}'
log.error(msg)
try:
os.remove(file_path)
except OSError:
pass
if verbose:
print(f' {msg}', file=sys.stderr)
if ignore_errors:
return False
else:
raise exc
elif not fileobj:
st = os.stat(file_path.encode('utf-8'))
if st.st_mtime == last_mod_mtime:
if self.name == f'{self.identifier}_files.xml' \
or (st.st_size == self.size):
msg = (f'skipping {file_path}, file already exists based on '
'length and date.')
log.info(msg)
if verbose:
print(f' {msg}', file=sys.stderr)
return

elif return_responses:
return response



if verbose:
total = int(response.headers.get('content-length', 0)) or None
progress_bar = tqdm(desc=f' downloading {self.name}',
total=total,
unit='iB',
unit_scale=True,
unit_divisor=1024)
else:
progress_bar = nullcontext()

if not chunk_size:
chunk_size = 1048576
if stdout:
fileobj = os.fdopen(sys.stdout.fileno(), 'wb', closefd=False)
if not fileobj or retrying:
if 'Range' in headers:
fileobj = open(file_path.encode('utf-8'), 'ab')
else:
fileobj = open(file_path.encode('utf-8'), 'wb')

with fileobj, progress_bar as bar:
for chunk in response.iter_content(chunk_size=chunk_size):
if chunk:
size = fileobj.write(chunk)
if bar is not None:
bar.update(size)
if ors:
fileobj.write(os.environ.get("ORS", "\n").encode("utf-8"))

if 'Range' in headers:
with open(file_path, 'rb') as fh:
local_checksum = utils.get_md5(fh)
try:
assert local_checksum == self.md5
except AssertionError:
msg = (f"\"{file_path}\" corrupt, "
"checksums do not match. "
"Remote file may have been modified, "
"retry download.")
os.remove(file_path.encode('utf-8'))
raise exceptions.InvalidChecksumError(msg)
break
except (RetryError, HTTPError, ConnectTimeout, OSError, ReadTimeout,
exceptions.InvalidChecksumError) as exc:
if retries > 0:
retrying = True
retries -= 1
msg = ('download failed, sleeping for '
f'{retries_sleep} seconds and retrying. '
f'{retries} retries left.')
log.warning(msg)
sleep(retries_sleep)
continue
msg = f'error downloading file {file_path}, exception raised: {exc}'
log.error(msg)
try:
os.remove(file_path)
except OSError:
pass
if verbose:
print(f' {msg}', file=sys.stderr)
if ignore_errors:
return False
else:
raise exc

# Set mtime with timestamp from Last-Modified header
if not no_change_timestamp:
Expand Down
27 changes: 8 additions & 19 deletions tests/test_item.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import internetarchive.files
from internetarchive import get_session
from internetarchive.api import get_item
from internetarchive.exceptions import InvalidChecksumError
from internetarchive.utils import InvalidIdentifierException, json, norm_filepath
from tests.conftest import (
NASA_METADATA_PATH,
Expand Down Expand Up @@ -151,6 +152,7 @@ def test_download(tmpdir, nasa_item):
adding_headers=EXPECTED_LAST_MOD_HEADER)
nasa_item.download(files='nasa_meta.xml')
assert len(tmpdir.listdir()) == 1
os.remove('nasa/nasa_meta.xml')
with IaRequestsMock() as rsps:
rsps.add(responses.GET, DOWNLOAD_URL_RE,
body='new test content',
Expand Down Expand Up @@ -198,26 +200,12 @@ def test_download_ignore_existing(tmpdir, nasa_item):
assert fh.read() == 'test content'


def test_download_clobber(tmpdir, nasa_item):
tmpdir.chdir()
with IaRequestsMock() as rsps:
rsps.add(responses.GET, DOWNLOAD_URL_RE,
body='test content',
adding_headers=EXPECTED_LAST_MOD_HEADER)
nasa_item.download(files='nasa_meta.xml')

rsps.reset()
rsps.add(responses.GET, DOWNLOAD_URL_RE,
body='new test content',
adding_headers=EXPECTED_LAST_MOD_HEADER)
nasa_item.download(files='nasa_meta.xml')
assert load_file('nasa/nasa_meta.xml') == 'new test content'


def test_download_checksum(tmpdir, caplog):
tmpdir.chdir()

# test overwrite based on checksum.
if os.path.exists('nasa/nasa_meta.xml'):
os.remove('nasa/nasa_meta.xml')
with IaRequestsMock() as rsps:
rsps.add_metadata_mock('nasa')
rsps.add(responses.GET, DOWNLOAD_URL_RE,
Expand All @@ -229,9 +217,10 @@ def test_download_checksum(tmpdir, caplog):

nasa_item = get_item('nasa')
nasa_item.download(files='nasa_meta.xml')
nasa_item.download(files='nasa_meta.xml', checksum=True)

assert load_file('nasa/nasa_meta.xml') == 'overwrite based on md5'
try:
nasa_item.download(files='nasa_meta.xml', checksum=True)
except InvalidChecksumError as exc:
assert "corrupt, checksums do not match." in str(exc)

# test no overwrite based on checksum.
with caplog.at_level(logging.DEBUG):
Expand Down

0 comments on commit 6ba9bcf

Please sign in to comment.