Skip to content

Commit

Permalink
aiosonic node transport
Browse files Browse the repository at this point in the history
  • Loading branch information
sonic182 committed Aug 5, 2024
1 parent b434f18 commit 1750ea4
Show file tree
Hide file tree
Showing 5 changed files with 218 additions and 0 deletions.
1 change: 1 addition & 0 deletions elastic_transport/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from ._models import ApiResponseMeta, HttpHeaders, NodeConfig, SniffOptions
from ._node import (
AiohttpHttpNode,
AiosonicHttpNode,
BaseAsyncNode,
BaseNode,
HttpxAsyncHttpNode,
Expand Down
2 changes: 2 additions & 0 deletions elastic_transport/_node/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
from ._base import BaseNode, NodeApiResponse
from ._base_async import BaseAsyncNode
from ._http_aiohttp import AiohttpHttpNode
from ._http_aiosonic import AiosonicHttpNode
from ._http_httpx import HttpxAsyncHttpNode
from ._http_requests import RequestsHttpNode
from ._http_urllib3 import Urllib3HttpNode

__all__ = [
"AiohttpHttpNode",
"AiosonicHttpNode",
"BaseNode",
"BaseAsyncNode",
"NodeApiResponse",
Expand Down
212 changes: 212 additions & 0 deletions elastic_transport/_node/_http_aiosonic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import asyncio
import gzip
import os
import re
import ssl
import warnings
from typing import Optional, Union

from .._compat import warn_stacklevel
from .._exceptions import ConnectionError, ConnectionTimeout, SecurityWarning, TlsError
from .._models import ApiResponseMeta, HttpHeaders, NodeConfig
from ..client_utils import DEFAULT, DefaultType, client_meta_version
from ._base import (
BUILTIN_EXCEPTIONS,
DEFAULT_CA_CERTS,
RERAISE_EXCEPTIONS,
NodeApiResponse,
ssl_context_from_node_config,
)
from ._base_async import BaseAsyncNode

try:
from aiosonic.timeout import Timeouts
from aiosonic.connection import TCPConnector
from aiosonic import HTTPClient
from aiosonic.version import VERSION
from aiosonic import exceptions as aiosonic_exceptions
from aiosonic.resolver import get_loop

_AIOSONIC_AVAILABLE = True
_AIOSONIC_META_VERSION = client_meta_version(VERSION)

_version_parts = []
for _version_part in VERSION.split(".")[:3]:
try:
_version_parts.append(int(re.search(r"^([0-9]+)", _version_part).group(1))) # type: ignore[union-attr]
except (AttributeError, ValueError):
break

except ImportError: # pragma: nocover
_AIOSONIC_AVAILABLE = False


class AiosonicHttpNode(BaseAsyncNode):
"""Default asynchronous node class using the ``aiohttp`` library via HTTP"""

_CLIENT_META_HTTP_CLIENT = ("as", _AIOSONIC_META_VERSION)
_BIG_TIMEOUT = 60 * 2 # 2 min timeout

def __init__(self, config: NodeConfig):
if not _AIOSONIC_AVAILABLE: # pragma: nocover
raise ValueError("You must have 'aiosonic' installed to use AiosonicHttpNode")

super().__init__(config)

ssl_context: Optional[ssl.SSLContext] = None
if config.scheme == "https":
if config.ssl_context is not None:
ssl_context = ssl_context_from_node_config(config)
else:
ssl_context = ssl_context_from_node_config(config)

ca_certs = (
DEFAULT_CA_CERTS if config.ca_certs is None else config.ca_certs
)
if config.verify_certs:
if not ca_certs:
raise ValueError(
"Root certificates are missing for certificate "
"validation. Either pass them in using the ca_certs parameter or "
"install certifi to use it automatically."
)
else:
if config.ssl_show_warn:
warnings.warn(
f"Connecting to {self.base_url!r} using TLS with verify_certs=False is insecure",
stacklevel=warn_stacklevel(),
category=SecurityWarning,
)

if ca_certs is not None:
if os.path.isfile(ca_certs):
ssl_context.load_verify_locations(cafile=ca_certs)
elif os.path.isdir(ca_certs):
ssl_context.load_verify_locations(capath=ca_certs)
else:
raise ValueError("ca_certs parameter is not a path")

# Use client_cert and client_key variables for SSL certificate configuration.
if config.client_cert and not os.path.isfile(config.client_cert):
raise ValueError("client_cert is not a path to a file")
if config.client_key and not os.path.isfile(config.client_key):
raise ValueError("client_key is not a path to a file")
if config.client_cert and config.client_key:
ssl_context.load_cert_chain(config.client_cert, config.client_key)
elif config.client_cert:
ssl_context.load_cert_chain(config.client_cert)

self._loop: asyncio.AbstractEventLoop = None # type: ignore[assignment]
self.client = HTTPClient(
connector=TCPConnector(
pool_size=config.connections_per_node,
use_dns_cache=True,
),
)

self._ssl_context = ssl_context

async def perform_request( # type: ignore[override]
self,
method: str,
target: str,
body: Optional[bytes] = None,
headers: Optional[HttpHeaders] = None,
request_timeout: Union[DefaultType, Optional[float]] = DEFAULT,
) -> NodeApiResponse:
url = self.base_url + target

if not self._loop:
self._loop = get_loop()

timeouts = Timeouts(request_timeout=request_timeout.value if request_timeout.value else self._BIG_TIMEOUT)

request_headers = self._headers.copy()
if headers:
request_headers.update(headers)

body_to_send: Optional[bytes]
if body:
if self._http_compress:
body_to_send = gzip.compress(body)
request_headers["content-encoding"] = "gzip"
else:
body_to_send = body
else:
body_to_send = None

try:
start = self._loop.time()
response = await self.client.request(
method=method,
url=url,
data=body_to_send,
headers=request_headers,
timeouts=timeouts,
ssl=self._ssl_context or None,
)
raw_data = await response.content()
duration = self._loop.time() - start

# We want to reraise a cancellation or recursion error.
except RERAISE_EXCEPTIONS:
raise
except Exception as e:
err: Exception
if isinstance(
e, (asyncio.TimeoutError, aiosonic_exceptions.TimeoutException)
):
err = ConnectionTimeout(
"Connection timed out during request", errors=(e,)
)
elif isinstance(e, (ssl.SSLError)):
err = TlsError(str(e), errors=(e,))
elif isinstance(e, BUILTIN_EXCEPTIONS):
raise
else:
err = ConnectionError(str(e), errors=(e,))
self._log_request(
method=method,
target=target,
headers=request_headers,
body=body,
exception=err,
)
raise err from None

meta = ApiResponseMeta(
node=self.config,
duration=duration,
http_version="1.1",
status=response.status_code,
headers=HttpHeaders(response.headers),
)
self._log_request(
method=method,
target=target,
headers=request_headers,
body=body,
meta=meta,
response=raw_data,
)
return NodeApiResponse(
meta,
raw_data,
)
2 changes: 2 additions & 0 deletions elastic_transport/_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
)
from ._node import (
AiohttpHttpNode,
AiosonicHttpNode,
BaseNode,
HttpxAsyncHttpNode,
RequestsHttpNode,
Expand All @@ -70,6 +71,7 @@
"urllib3": Urllib3HttpNode,
"requests": RequestsHttpNode,
"aiohttp": AiohttpHttpNode,
"aiosonic": AiosonicHttpNode,
"httpxasync": HttpxAsyncHttpNode,
}
# These are HTTP status errors that shouldn't be considered
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
"trustme",
"requests",
"aiohttp",
"aiosonic",
"httpx",
"respx",
"opentelemetry-api",
Expand Down

0 comments on commit 1750ea4

Please sign in to comment.