This repository has been archived by the owner on Dec 16, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2.3k
make 'cached_path' work offline #4253
Merged
Merged
Changes from 3 commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
b4bed00
make 'cached_path' work offline
epwalsh d283513
add test case
epwalsh 2829052
Update allennlp/common/file_utils.py
epwalsh ee1e40b
log some more info
epwalsh b4caab9
Merge branch 'file-utils-offline' of github.com:epwalsh/allennlp into…
epwalsh 150aaf1
else clause
epwalsh File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,22 +2,24 @@ | |
Utilities for working with the local dataset cache. | ||
""" | ||
|
||
import glob | ||
import os | ||
import logging | ||
import shutil | ||
import tempfile | ||
import json | ||
from urllib.parse import urlparse | ||
from pathlib import Path | ||
from typing import Optional, Tuple, Union, IO, Callable, Set | ||
from typing import Optional, Tuple, Union, IO, Callable, Set, List | ||
from hashlib import sha256 | ||
from functools import wraps | ||
|
||
import boto3 | ||
import botocore | ||
from botocore.exceptions import ClientError | ||
from botocore.exceptions import ClientError, EndpointConnectionError | ||
import requests | ||
from requests.adapters import HTTPAdapter | ||
from requests.exceptions import ConnectionError | ||
from requests.packages.urllib3.util.retry import Retry | ||
|
||
from allennlp.common.tqdm import Tqdm | ||
|
@@ -124,7 +126,7 @@ def is_url_or_existing_file(url_or_filename: Union[str, Path, None]) -> bool: | |
return parsed.scheme in ("http", "https", "s3") or os.path.exists(url_or_filename) | ||
|
||
|
||
def split_s3_path(url: str) -> Tuple[str, str]: | ||
def _split_s3_path(url: str) -> Tuple[str, str]: | ||
"""Split a full s3 path into the bucket name and path.""" | ||
parsed = urlparse(url) | ||
if not parsed.netloc or not parsed.path: | ||
|
@@ -137,7 +139,7 @@ def split_s3_path(url: str) -> Tuple[str, str]: | |
return bucket_name, s3_path | ||
|
||
|
||
def s3_request(func: Callable): | ||
def _s3_request(func: Callable): | ||
""" | ||
Wrapper function for s3 requests in order to create more helpful error | ||
messages. | ||
|
@@ -156,7 +158,7 @@ def wrapper(url: str, *args, **kwargs): | |
return wrapper | ||
|
||
|
||
def get_s3_resource(): | ||
def _get_s3_resource(): | ||
session = boto3.session.Session() | ||
if session.get_credentials() is None: | ||
# Use unsigned requests. | ||
|
@@ -168,24 +170,24 @@ def get_s3_resource(): | |
return s3_resource | ||
|
||
|
||
@s3_request | ||
def s3_etag(url: str) -> Optional[str]: | ||
@_s3_request | ||
def _s3_etag(url: str) -> Optional[str]: | ||
"""Check ETag on S3 object.""" | ||
s3_resource = get_s3_resource() | ||
bucket_name, s3_path = split_s3_path(url) | ||
s3_resource = _get_s3_resource() | ||
bucket_name, s3_path = _split_s3_path(url) | ||
s3_object = s3_resource.Object(bucket_name, s3_path) | ||
return s3_object.e_tag | ||
|
||
|
||
@s3_request | ||
def s3_get(url: str, temp_file: IO) -> None: | ||
@_s3_request | ||
def _s3_get(url: str, temp_file: IO) -> None: | ||
"""Pull a file directly from S3.""" | ||
s3_resource = get_s3_resource() | ||
bucket_name, s3_path = split_s3_path(url) | ||
s3_resource = _get_s3_resource() | ||
bucket_name, s3_path = _split_s3_path(url) | ||
s3_resource.Bucket(bucket_name).download_fileobj(s3_path, temp_file) | ||
|
||
|
||
def session_with_backoff() -> requests.Session: | ||
def _session_with_backoff() -> requests.Session: | ||
""" | ||
We ran into an issue where http requests to s3 were timing out, | ||
possibly because we were making too many requests too quickly. | ||
|
@@ -201,8 +203,18 @@ def session_with_backoff() -> requests.Session: | |
return session | ||
|
||
|
||
def http_get(url: str, temp_file: IO) -> None: | ||
with session_with_backoff() as session: | ||
def _http_etag(url: str) -> Optional[str]: | ||
with _session_with_backoff() as session: | ||
response = session.head(url, allow_redirects=True) | ||
if response.status_code != 200: | ||
raise IOError( | ||
"HEAD request failed for url {} with status code {}".format(url, response.status_code) | ||
) | ||
return response.headers.get("ETag") | ||
|
||
|
||
def _http_get(url: str, temp_file: IO) -> None: | ||
with _session_with_backoff() as session: | ||
req = session.get(url, stream=True) | ||
content_length = req.headers.get("Content-Length") | ||
total = int(content_length) if content_length is not None else None | ||
|
@@ -214,6 +226,22 @@ def http_get(url: str, temp_file: IO) -> None: | |
progress.close() | ||
|
||
|
||
def _find_latest_cached(url: str, cache_dir: str) -> Optional[str]: | ||
filename = url_to_filename(url) | ||
cache_path = os.path.join(cache_dir, filename) | ||
candidates: List[Tuple[str, float]] = [] | ||
for path in glob.glob(cache_path + "*"): | ||
if path.endswith(".json"): | ||
continue | ||
mtime = os.path.getmtime(path) | ||
candidates.append((path, mtime)) | ||
# Sort candidates by modification time, newest first. | ||
candidates.sort(key=lambda x: x[1], reverse=True) | ||
if candidates: | ||
return candidates[0][0] | ||
return None | ||
|
||
|
||
# TODO(joelgrus): do we want to do checksums or anything like that? | ||
def get_from_cache(url: str, cache_dir: str = None) -> str: | ||
""" | ||
|
@@ -226,18 +254,30 @@ def get_from_cache(url: str, cache_dir: str = None) -> str: | |
os.makedirs(cache_dir, exist_ok=True) | ||
|
||
# Get eTag to add to filename, if it exists. | ||
if url.startswith("s3://"): | ||
etag = s3_etag(url) | ||
else: | ||
with session_with_backoff() as session: | ||
response = session.head(url, allow_redirects=True) | ||
if response.status_code != 200: | ||
raise IOError( | ||
"HEAD request failed for url {} with status code {}".format( | ||
url, response.status_code | ||
) | ||
) | ||
etag = response.headers.get("ETag") | ||
try: | ||
if url.startswith("s3://"): | ||
etag = _s3_etag(url) | ||
else: | ||
etag = _http_etag(url) | ||
except (ConnectionError, EndpointConnectionError): | ||
# We might be offline, in which case we don't want to throw an error | ||
# just yet. Instead, we'll try to use the latest cached version of the | ||
# target resource, if it exists. We'll only throw an exception if we | ||
# haven't cached the resource at all yet. | ||
logger.warning( | ||
"Connection error occured while trying to fetch etag for %s. " | ||
"Will attempt to use latest cached version of resource", | ||
url, | ||
) | ||
latest_cached = _find_latest_cached(url, cache_dir) | ||
if latest_cached: | ||
return latest_cached | ||
logger.error( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a nit, but I prefer to have an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh yea that's totally fine with me, I'm just used to certain linters raising a fit about "unnecessary else clause". I guess either flake8 doesn't care or we're explicitly ignoring that in our |
||
"Connection failed while trying to fetch etag, " | ||
"and no cached version of %s could be found", | ||
url, | ||
) | ||
raise | ||
|
||
filename = url_to_filename(url, etag) | ||
|
||
|
@@ -252,9 +292,9 @@ def get_from_cache(url: str, cache_dir: str = None) -> str: | |
|
||
# GET file object | ||
if url.startswith("s3://"): | ||
s3_get(url, temp_file) | ||
_s3_get(url, temp_file) | ||
else: | ||
http_get(url, temp_file) | ||
_http_get(url, temp_file) | ||
|
||
# we are copying the file before closing it, so flush to avoid truncation | ||
temp_file.flush() | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it worth logging something here? Users probably will not hit this often.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In addition to the warning right above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No--my bad. Should we log the filepath of the file that will be used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Do you think INFO or WARNING level would be more appropriate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
INFO?