Skip to content

Commit

Permalink
Added SOCKSConnection creation into connection pool. Moved SOCKSConne…
Browse files Browse the repository at this point in the history
…ction to the dedicated file to hide this class under socksio import check
  • Loading branch information
cdeler committed Sep 18, 2020
1 parent e950e6e commit ca5a733
Show file tree
Hide file tree
Showing 9 changed files with 358 additions and 307 deletions.
115 changes: 1 addition & 114 deletions httpcore/_async/connection.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
from ssl import SSLContext
from typing import List, Optional, Tuple

from socksio import socks5

from .._backends.auto import AsyncBackend, AsyncLock, AsyncSocketStream, AutoBackend
from .._exceptions import ProxyError
from .._types import URL, Headers, Origin, Socks5ProxyCredentials, TimeoutDict
from .._types import URL, Headers, Origin, TimeoutDict
from .._utils import get_logger, url_to_origin
from .base import (
AsyncByteStream,
Expand Down Expand Up @@ -167,113 +164,3 @@ async def aclose(self) -> None:
async with self.request_lock:
if self.connection is not None:
await self.connection.aclose()


class AsyncSOCKSConnection(AsyncHTTPConnection):
def __init__(
self,
origin: Origin,
http2: bool = False,
uds: str = None,
ssl_context: SSLContext = None,
socket: AsyncSocketStream = None,
local_address: str = None,
backend: AsyncBackend = None,
*,
proxy_origin: Origin,
proxy_credentials: Socks5ProxyCredentials = None,
):
assert proxy_origin[0] in (b"socks5",)

super().__init__(
origin, http2, uds, ssl_context, socket, local_address, backend
)
self.proxy_origin = proxy_origin
self.proxy_connection = socks5.SOCKS5Connection()
self.proxy_credentials = proxy_credentials

async def _open_socket(self, timeout: TimeoutDict = None) -> AsyncSocketStream:
_, proxy_hostname, proxy_port = self.proxy_origin
scheme, hostname, port = self.origin
ssl_context = self.ssl_context if scheme == b"https" else None
timeout = timeout or {}

try:
proxy_socket = await self.backend.open_tcp_stream(
proxy_hostname,
proxy_port,
None,
timeout,
local_address=self.local_address,
)

await self._auth_proxy(proxy_socket, timeout)
await self._connect_through_proxy(proxy_socket, hostname, port, timeout)

if ssl_context:
proxy_socket = await proxy_socket.start_tls(
hostname, ssl_context, timeout
)

return proxy_socket
except Exception: # noqa: PIE786
self.connect_failed = True
raise

async def _auth_proxy(
self, socket: AsyncSocketStream, timeout: TimeoutDict
) -> None:
auth_request = socks5.SOCKS5AuthMethodsRequest(
[
socks5.SOCKS5AuthMethod.NO_AUTH_REQUIRED,
socks5.SOCKS5AuthMethod.USERNAME_PASSWORD,
]
)

self.proxy_connection.send(auth_request)

bytes_to_send = self.proxy_connection.data_to_send()
await socket.write(bytes_to_send, timeout)

data = await socket.read(1024, timeout)
auth_ev = self.proxy_connection.receive_data(data)

if auth_ev.method == socks5.SOCKS5AuthMethod.USERNAME_PASSWORD: # type: ignore
if self.proxy_credentials is None:
raise ProxyError(
"This proxy requires auth, but you didn't set user/password"
)

user, password = self.proxy_credentials
user_password_request = socks5.SOCKS5UsernamePasswordRequest(user, password)
self.proxy_connection.send(user_password_request)
await socket.write(self.proxy_connection.data_to_send(), timeout)
user_password_response = await socket.read(2048, timeout)

user_password_event = self.proxy_connection.receive_data(
user_password_response
)

if not user_password_event.success: # type: ignore
raise ProxyError("Invalid user/password provided to proxy auth")

async def _connect_through_proxy(
self,
socket: AsyncSocketStream,
hostname: bytes,
port: int,
timeout: TimeoutDict,
) -> None:
connect_request = socks5.SOCKS5CommandRequest.from_address(
socks5.SOCKS5Command.CONNECT, (hostname, port)
)

self.proxy_connection.send(connect_request)
bytes_to_send = self.proxy_connection.data_to_send()

await socket.write(bytes_to_send, timeout)
data = await socket.read(1024, timeout)
event = self.proxy_connection.receive_data(data)

# development only assert
assert event.reply_code == socks5.SOCKS5ReplyCode.SUCCEEDED # type: ignore
48 changes: 39 additions & 9 deletions httpcore/_async/connection_pool.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import importlib.util
import warnings
from ssl import SSLContext
from typing import AsyncIterator, Callable, Dict, List, Optional, Set, Tuple
Expand All @@ -6,7 +7,7 @@
from .._backends.base import lookup_async_backend
from .._exceptions import LocalProtocolError, PoolTimeout, UnsupportedProtocol
from .._threadlock import ThreadLock
from .._types import URL, Headers, Origin, TimeoutDict
from .._types import URL, Headers, Origin, Socks5ProxyConfig, TimeoutDict
from .._utils import get_logger, origin_to_url_string, url_to_origin
from .base import (
AsyncByteStream,
Expand Down Expand Up @@ -98,6 +99,7 @@ def __init__(
local_address: str = None,
max_keepalive: int = None,
backend: str = "auto",
socks5_proxy_conf: Socks5ProxyConfig = None,
):
if max_keepalive is not None:
warnings.warn(
Expand All @@ -117,6 +119,7 @@ def __init__(
self._thread_lock = ThreadLock()
self._backend = lookup_async_backend(backend)
self._next_keepalive_check = 0.0
self._socks5_proxy_conf = socks5_proxy_conf

if http2:
try:
Expand All @@ -127,6 +130,14 @@ def __init__(
"package is not installed. Use 'pip install httpcore[http2]'."
)

if socks5_proxy_conf is not None:
module_spec = importlib.util.find_spec("socksio")
if module_spec is None:
raise ImportError(
"Attempted to use socks5 proxy, but "
"'socksio' module is not installed."
)

@property
def _connection_semaphore(self) -> AsyncSemaphore:
# We do this lazily, to make sure backend autodetection always
Expand Down Expand Up @@ -175,14 +186,33 @@ async def request(
connection = await self._get_connection_from_pool(origin)

if connection is None:
connection = AsyncHTTPConnection(
origin=origin,
http2=self._http2,
uds=self._uds,
ssl_context=self._ssl_context,
local_address=self._local_address,
backend=self._backend,
)
if self._socks5_proxy_conf:
from .socks_proxy import AsyncSOCKSConnection

proxy_origin = (
b"socks5",
self._socks5_proxy_conf.host,
self._socks5_proxy_conf.port,
)
connection = AsyncSOCKSConnection(
origin=origin,
http2=self._http2,
uds=self._uds,
ssl_context=self._ssl_context,
local_address=self._local_address,
backend=self._backend,
proxy_origin=proxy_origin,
proxy_credentials=self._socks5_proxy_conf.auth_credentials,
)
else:
connection = AsyncHTTPConnection(
origin=origin,
http2=self._http2,
uds=self._uds,
ssl_context=self._ssl_context,
local_address=self._local_address,
backend=self._backend,
)
logger.trace("created connection=%r", connection)
await self._add_to_pool(connection, timeout=timeout)
else:
Expand Down
121 changes: 121 additions & 0 deletions httpcore/_async/socks_proxy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
from ssl import SSLContext

from socksio import socks5

from .._backends.auto import AsyncBackend, AsyncSocketStream
from .._exceptions import ProxyError
from .._types import Origin, Socks5ProxyCredentials, TimeoutDict
from .._utils import get_logger
from .connection import AsyncHTTPConnection

logger = get_logger(__name__)


class AsyncSOCKSConnection(AsyncHTTPConnection):
def __init__(
self,
origin: Origin,
http2: bool = False,
uds: str = None,
ssl_context: SSLContext = None,
socket: AsyncSocketStream = None,
local_address: str = None,
backend: AsyncBackend = None,
*,
proxy_origin: Origin,
proxy_credentials: Socks5ProxyCredentials = None,
):
assert proxy_origin[0] in (b"socks5",)

super().__init__(
origin, http2, uds, ssl_context, socket, local_address, backend
)
self.proxy_origin = proxy_origin
self.proxy_connection = socks5.SOCKS5Connection()
self.proxy_credentials = proxy_credentials

async def _open_socket(self, timeout: TimeoutDict = None) -> AsyncSocketStream:
_, proxy_hostname, proxy_port = self.proxy_origin
scheme, hostname, port = self.origin
ssl_context = self.ssl_context if scheme == b"https" else None
timeout = timeout or {}

try:
proxy_socket = await self.backend.open_tcp_stream(
proxy_hostname,
proxy_port,
None,
timeout,
local_address=self.local_address,
)

await self._auth_proxy(proxy_socket, timeout)
await self._connect_through_proxy(proxy_socket, hostname, port, timeout)

if ssl_context:
proxy_socket = await proxy_socket.start_tls(
hostname, ssl_context, timeout
)

return proxy_socket
except Exception: # noqa: PIE786
self.connect_failed = True
raise

async def _auth_proxy(
self, socket: AsyncSocketStream, timeout: TimeoutDict
) -> None:
auth_request = socks5.SOCKS5AuthMethodsRequest(
[
socks5.SOCKS5AuthMethod.NO_AUTH_REQUIRED,
socks5.SOCKS5AuthMethod.USERNAME_PASSWORD,
]
)

self.proxy_connection.send(auth_request)

bytes_to_send = self.proxy_connection.data_to_send()
await socket.write(bytes_to_send, timeout)

data = await socket.read(1024, timeout)
auth_ev = self.proxy_connection.receive_data(data)

if auth_ev.method == socks5.SOCKS5AuthMethod.USERNAME_PASSWORD: # type: ignore
if self.proxy_credentials is None:
raise ProxyError(
"This proxy requires auth, but you didn't set user/password"
)

user, password = self.proxy_credentials
user_password_request = socks5.SOCKS5UsernamePasswordRequest(user, password)
self.proxy_connection.send(user_password_request)
await socket.write(self.proxy_connection.data_to_send(), timeout)
user_password_response = await socket.read(2048, timeout)

user_password_event = self.proxy_connection.receive_data(
user_password_response
)

if not user_password_event.success: # type: ignore
raise ProxyError("Invalid user/password provided to proxy auth")

async def _connect_through_proxy(
self,
socket: AsyncSocketStream,
hostname: bytes,
port: int,
timeout: TimeoutDict,
) -> None:
connect_request = socks5.SOCKS5CommandRequest.from_address(
socks5.SOCKS5Command.CONNECT, (hostname, port)
)

self.proxy_connection.send(connect_request)
bytes_to_send = self.proxy_connection.data_to_send()

await socket.write(bytes_to_send, timeout)
data = await socket.read(1024, timeout)
event = self.proxy_connection.receive_data(data)

# development only assert
assert event.reply_code == socks5.SOCKS5ReplyCode.SUCCEEDED # type: ignore
Loading

0 comments on commit ca5a733

Please sign in to comment.