-
-
Notifications
You must be signed in to change notification settings - Fork 292
/
Copy pathfetcher.py
139 lines (121 loc) · 5.27 KB
/
fetcher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# Copyright 2021 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).
from __future__ import absolute_import
import ssl
import time
from contextlib import closing, contextmanager
from pex.auth import PasswordDatabase, PasswordEntry
from pex.compatibility import (
FileHandler,
HTTPBasicAuthHandler,
HTTPDigestAuthHandler,
HTTPError,
HTTPPasswordMgrWithDefaultRealm,
HTTPSHandler,
ProxyHandler,
Request,
build_opener,
)
from pex.network_configuration import NetworkConfiguration
from pex.typing import TYPE_CHECKING, cast
from pex.version import __version__
if TYPE_CHECKING:
from typing import BinaryIO, Dict, Iterable, Iterator, Mapping, Optional, Text
else:
BinaryIO = None
class URLFetcher(object):
USER_AGENT = "pex/{version}".format(version=__version__)
def __init__(
self,
network_configuration=None, # type: Optional[NetworkConfiguration]
handle_file_urls=False, # type: bool
password_entries=(), # type: Iterable[PasswordEntry]
netrc_file="~/.netrc", # type: Optional[str]
):
# type: (...) -> None
network_configuration = network_configuration or NetworkConfiguration()
self._timeout = network_configuration.timeout
self._max_retries = network_configuration.retries
ssl_context = ssl.create_default_context(cafile=network_configuration.cert)
if network_configuration.client_cert:
ssl_context.load_cert_chain(network_configuration.client_cert)
proxies = None # type: Optional[Dict[str, str]]
if network_configuration.proxy:
proxies = {protocol: network_configuration.proxy for protocol in ("http", "https")}
handlers = [ProxyHandler(proxies), HTTPSHandler(context=ssl_context)]
if handle_file_urls:
handlers.append(FileHandler())
self._password_database = PasswordDatabase.from_netrc(netrc_file=netrc_file).append(
password_entries
)
self._handlers = tuple(handlers)
@contextmanager
def get_body_stream(
self,
url, # type: Text
extra_headers=None, # type: Optional[Mapping[str, str]]
):
# type: (...) -> Iterator[BinaryIO]
handlers = list(self._handlers)
if self._password_database.entries:
password_manager = HTTPPasswordMgrWithDefaultRealm()
for password_entry in self._password_database.entries:
# N.B.: The password manager adds a second entry implicitly if the URI we hand it
# does not include port information (80 for http URIs and 443 for https URIs).
password_manager.add_password(
realm=None,
uri=password_entry.uri_or_default(url),
user=password_entry.username,
passwd=password_entry.password,
)
handlers.extend(
(HTTPBasicAuthHandler(password_manager), HTTPDigestAuthHandler(password_manager))
)
retries = 0
retry_delay_secs = 0.1
last_error = None # type: Optional[Exception]
while retries <= self._max_retries:
if retries > 0:
time.sleep(retry_delay_secs)
retry_delay_secs *= 2
opener = build_opener(*handlers)
headers = dict(extra_headers) if extra_headers else {}
headers["User-Agent"] = self.USER_AGENT
request = Request(
# N.B.: MyPy incorrectly thinks url must be a str in Python 2 where a unicode url
# actually works fine.
url, # type: ignore[arg-type]
headers=headers,
)
# The fp is typed as Optional[...] for Python 2 only in the typeshed. A `None`
# can only be returned if a faulty custom handler is installed and we only
# install stdlib handlers.
fp = cast(BinaryIO, opener.open(request, timeout=self._timeout))
try:
with closing(fp) as body_stream:
yield body_stream
return
except HTTPError as e:
# See: https://tools.ietf.org/html/rfc2616#page-39
if e.code not in (
408, # Request Time-out
500, # Internal Server Error
503, # Service Unavailable
504, # Gateway Time-out
):
raise e
last_error = e
except (IOError, OSError) as e:
# Unfortunately errors are overly broad at this point. We can get either OSError or
# URLError (a subclass of OSError) which at times indicates retryable socket level
# errors. Since retrying a non-retryable socket level error just wastes local
# machine resources we err towards always retrying.
last_error = e
finally:
retries += 1
raise cast(Exception, last_error)
@contextmanager
def get_body_iter(self, url):
# type: (Text) -> Iterator[Iterator[Text]]
with self.get_body_stream(url) as body_stream:
yield (line.decode("utf-8") for line in body_stream.readlines())