diff --git a/dev-requirements.txt b/dev-requirements.txt index c41d953c..487c1271 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -8,6 +8,7 @@ sphinx_rtd_theme jinja2 pytz deepmerge +Events # No wheels for Python 3.10 yet! numpy; python_version<"3.10" diff --git a/opensearchpy/_async/http_aiohttp.py b/opensearchpy/_async/http_aiohttp.py index c49fd574..ed9a5515 100644 --- a/opensearchpy/_async/http_aiohttp.py +++ b/opensearchpy/_async/http_aiohttp.py @@ -28,11 +28,14 @@ import asyncio import os import ssl +import time import warnings from typing import Any, Collection, Mapping, Optional, Union import urllib3 +from opensearchpy.metrics import TimeMetrics + from ..compat import reraise_exceptions, urlencode from ..connection.base import Connection from ..exceptions import ( @@ -130,6 +133,7 @@ def __init__( """ self.headers = {} + self.kwargs = kwargs super().__init__( host=host, @@ -291,8 +295,14 @@ async def perform_request( body = self._gzip_compress(body) req_headers["content-encoding"] = "gzip" - start = self.loop.time() + calculate_service_time = False + if "calculate_service_time" in self.kwargs: + calculate_service_time = self.kwargs["calculate_service_time"] + + time_metrics = TimeMetrics() + try: + time_metrics.events.server_request_start() async with self.session.request( method, url, @@ -306,7 +316,7 @@ async def perform_request( raw_data = "" else: raw_data = await response.text() - duration = self.loop.time() - start + time_metrics.events.server_request_end() # We want to reraise a cancellation or recursion error. except reraise_exceptions: @@ -317,7 +327,7 @@ async def perform_request( url, url_path, orig_body, - self.loop.time() - start, + time.perf_counter() - time_metrics.start_time, exception=e, ) if isinstance(e, aiohttp_exceptions.ServerFingerprintMismatch): @@ -339,7 +349,7 @@ async def perform_request( url, url_path, orig_body, - duration, + time_metrics.service_time, status_code=response.status, response=raw_data, ) @@ -350,10 +360,24 @@ async def perform_request( ) self.log_request_success( - method, url, url_path, orig_body, response.status, raw_data, duration + method, + url, + url_path, + orig_body, + response.status, + raw_data, + time_metrics.service_time, ) - return response.status, response.headers, raw_data + if calculate_service_time: + return ( + response.status, + response.headers, + raw_data, + time_metrics.service_time, + ) + else: + return response.status, response.headers, raw_data async def close(self) -> Any: """ diff --git a/opensearchpy/_async/transport.py b/opensearchpy/_async/transport.py index e8b17252..7febdcc3 100644 --- a/opensearchpy/_async/transport.py +++ b/opensearchpy/_async/transport.py @@ -382,19 +382,39 @@ async def perform_request( method, params, body ) + calculate_service_time = False + if "calculate_service_time" in self.kwargs: + calculate_service_time = self.kwargs["calculate_service_time"] + for attempt in range(self.max_retries + 1): connection = self.get_connection() try: - status, headers_response, data = await connection.perform_request( - method, - url, - params, - body, - headers=headers, - ignore=ignore, - timeout=timeout, - ) + if calculate_service_time: + ( + status, + headers_response, + data, + service_time, + ) = await connection.perform_request( + method, + url, + params, + body, + headers=headers, + ignore=ignore, + timeout=timeout, + ) + else: + status, headers_response, data = await connection.perform_request( + method, + url, + params, + body, + headers=headers, + ignore=ignore, + timeout=timeout, + ) # Lowercase all the header names for consistency in accessing them. headers_response = { @@ -437,6 +457,8 @@ async def perform_request( data = self.deserializer.loads( data, headers_response.get("content-type") ) + if calculate_service_time: + data["client_metrics"] = {"service_time": service_time} return data async def close(self) -> None: diff --git a/opensearchpy/connection/http_async.py b/opensearchpy/connection/http_async.py index f5a4ec7c..0bcf2e44 100644 --- a/opensearchpy/connection/http_async.py +++ b/opensearchpy/connection/http_async.py @@ -12,9 +12,12 @@ import asyncio import os import ssl +import time import warnings from typing import Any, Collection, Mapping, Optional, Union +from opensearchpy.metrics import TimeMetrics + from .._async._extra_imports import aiohttp, aiohttp_exceptions # type: ignore from .._async.compat import get_running_loop from .._async.http_aiohttp import AIOHttpConnection @@ -55,7 +58,7 @@ def __init__( **kwargs: Any ) -> None: self.headers = {} - + self.kwargs = kwargs super().__init__( host=host, port=port, @@ -210,8 +213,14 @@ async def perform_request( **self._http_auth(method, url, query_string, body), } - start = self.loop.time() + calculate_service_time = False + if "calculate_service_time" in self.kwargs: + calculate_service_time = self.kwargs["calculate_service_time"] + + time_metrics = TimeMetrics() + try: + time_metrics.events.server_request_start() async with self.session.request( method, url, @@ -226,7 +235,7 @@ async def perform_request( raw_data = "" else: raw_data = await response.text() - duration = self.loop.time() - start + time_metrics.events.server_request_end() # We want to reraise a cancellation or recursion error. except reraise_exceptions: @@ -237,7 +246,7 @@ async def perform_request( str(url), url_path, orig_body, - self.loop.time() - start, + time.perf_counter() - time_metrics.start_time, exception=e, ) if isinstance(e, aiohttp_exceptions.ServerFingerprintMismatch): @@ -259,17 +268,31 @@ async def perform_request( str(url), url_path, orig_body, - duration, + time_metrics.service_time, status_code=response.status, response=raw_data, ) self._raise_error(response.status, raw_data) self.log_request_success( - method, str(url), url_path, orig_body, response.status, raw_data, duration + method, + str(url), + url_path, + orig_body, + response.status, + raw_data, + time_metrics.service_time, ) - return response.status, response.headers, raw_data + if calculate_service_time: + return ( + response.status, + response.headers, + raw_data, + time_metrics.service_time, + ) + else: + return response.status, response.headers, raw_data async def close(self) -> Any: """ diff --git a/opensearchpy/connection/http_requests.py b/opensearchpy/connection/http_requests.py index 9bf83004..ddfd0cad 100644 --- a/opensearchpy/connection/http_requests.py +++ b/opensearchpy/connection/http_requests.py @@ -29,6 +29,8 @@ import warnings from typing import Any, Collection, Mapping, Optional, Union +from opensearchpy.metrics import TimeMetrics + try: import requests @@ -88,6 +90,7 @@ def __init__( pool_maxsize: Any = None, **kwargs: Any ) -> None: + self.kwargs = kwargs if not REQUESTS_AVAILABLE: raise ImproperlyConfigured( "Please install requests to use RequestsHttpConnection." @@ -176,7 +179,6 @@ def perform_request( # type: ignore body = self._gzip_compress(body) headers["content-encoding"] = "gzip" # type: ignore - start = time.time() request = requests.Request(method=method, headers=headers, url=url, data=body) prepared_request = self.session.prepare_request(request) settings = self.session.merge_environment_settings( @@ -187,9 +189,17 @@ def perform_request( # type: ignore "allow_redirects": allow_redirects, } send_kwargs.update(settings) + + calculate_service_time = False + if "calculate_service_time" in self.kwargs: + calculate_service_time = self.kwargs["calculate_service_time"] + + time_metrics = TimeMetrics() + try: + time_metrics.events.server_request_start() response = self.session.send(prepared_request, **send_kwargs) - duration = time.time() - start + time_metrics.events.server_request_end() raw_data = response.content.decode("utf-8", "surrogatepass") except reraise_exceptions: raise @@ -199,7 +209,7 @@ def perform_request( # type: ignore url, prepared_request.path_url, orig_body, - time.time() - start, + time.perf_counter() - time_metrics.start_time, exception=e, ) if isinstance(e, requests.exceptions.SSLError): @@ -224,7 +234,7 @@ def perform_request( # type: ignore url, response.request.path_url, orig_body, - duration, + time_metrics.service_time, response.status_code, raw_data, ) @@ -241,10 +251,17 @@ def perform_request( # type: ignore orig_body, response.status_code, raw_data, - duration, + time_metrics.service_time, ) - - return response.status_code, response.headers, raw_data + if calculate_service_time: + return ( + response.status_code, + response.headers, + raw_data, + time_metrics.service_time, + ) + else: + return response.status_code, response.headers, raw_data @property def headers(self) -> Any: # type: ignore diff --git a/opensearchpy/connection/http_urllib3.py b/opensearchpy/connection/http_urllib3.py index ab9a1a78..88259930 100644 --- a/opensearchpy/connection/http_urllib3.py +++ b/opensearchpy/connection/http_urllib3.py @@ -34,6 +34,8 @@ from urllib3.exceptions import SSLError as UrllibSSLError from urllib3.util.retry import Retry +from opensearchpy.metrics import TimeMetrics + from ..compat import reraise_exceptions, urlencode from ..exceptions import ( ConnectionError, @@ -117,6 +119,7 @@ def __init__( opaque_id: Any = None, **kwargs: Any ) -> None: + self.kwargs = kwargs # Initialize headers before calling super().__init__(). self.headers = urllib3.make_headers(keep_alive=True) @@ -242,8 +245,14 @@ def perform_request( full_url = self.host + url - start = time.time() orig_body = body + + calculate_service_time = False + if "calculate_service_time" in self.kwargs: + calculate_service_time = self.kwargs["calculate_service_time"] + + time_metrics = TimeMetrics() + try: kw = {} if timeout: @@ -267,17 +276,22 @@ def perform_request( if self.http_auth is not None: if isinstance(self.http_auth, Callable): # type: ignore request_headers.update(self.http_auth(method, full_url, body)) - + time_metrics.events.server_request_start() response = self.pool.urlopen( method, url, body, retries=Retry(False), headers=request_headers, **kw ) - duration = time.time() - start + time_metrics.events.server_request_end() raw_data = response.data.decode("utf-8", "surrogatepass") except reraise_exceptions: raise except Exception as e: self.log_request_fail( - method, full_url, url, orig_body, time.time() - start, exception=e + method, + full_url, + url, + orig_body, + time.perf_counter() - time_metrics.start_time, + exception=e, ) if isinstance(e, UrllibSSLError): raise SSLError("N/A", str(e), e) @@ -292,7 +306,13 @@ def perform_request( # raise errors based on http status codes, let the client handle those if needed if not (200 <= response.status < 300) and response.status not in ignore: self.log_request_fail( - method, full_url, url, orig_body, duration, response.status, raw_data + method, + full_url, + url, + orig_body, + time_metrics.service_time, + response.status, + raw_data, ) self._raise_error( response.status, @@ -301,10 +321,24 @@ def perform_request( ) self.log_request_success( - method, full_url, url, orig_body, response.status, raw_data, duration + method, + full_url, + url, + orig_body, + response.status, + raw_data, + time_metrics.service_time, ) - return response.status, response.headers, raw_data + if calculate_service_time: + return ( + response.status, + response.headers, + raw_data, + time_metrics.service_time, + ) + else: + return response.status, response.headers, raw_data def get_response_headers(self, response: Any) -> Any: return {header.lower(): value for header, value in response.headers.items()} diff --git a/opensearchpy/metrics.py b/opensearchpy/metrics.py new file mode 100644 index 00000000..6b445de7 --- /dev/null +++ b/opensearchpy/metrics.py @@ -0,0 +1,32 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# +# Modifications Copyright OpenSearch Contributors. See +# GitHub history for details. + + +import time + +from events import Events + + +class TimeMetrics: + def __init__(self): + self.events = Events() + self.start_time = 0 + self.end_time = 0 + self.service_time = 0 + + # Subscribe to the server_request_start and server_request_end events + self.events.server_request_start += self.server_request_start + self.events.server_request_end += self.server_request_end + + def server_request_start(self): + self.start_time = time.perf_counter() + + def server_request_end(self): + self.end_time = time.perf_counter() + self.service_time = self.end_time - self.start_time diff --git a/opensearchpy/transport.py b/opensearchpy/transport.py index f582a3be..8aaa72e6 100644 --- a/opensearchpy/transport.py +++ b/opensearchpy/transport.py @@ -401,19 +401,39 @@ def perform_request( method, params, body ) + calculate_service_time = False + if "calculate_service_time" in self.kwargs: + calculate_service_time = self.kwargs["calculate_service_time"] + for attempt in range(self.max_retries + 1): connection = self.get_connection() try: - status, headers_response, data = connection.perform_request( - method, - url, - params, - body, - headers=headers, - ignore=ignore, - timeout=timeout, - ) + if calculate_service_time: + ( + status, + headers_response, + data, + service_time, + ) = connection.perform_request( + method, + url, + params, + body, + headers=headers, + ignore=ignore, + timeout=timeout, + ) + else: + status, headers_response, data = connection.perform_request( + method, + url, + params, + body, + headers=headers, + ignore=ignore, + timeout=timeout, + ) # Lowercase all the header names for consistency in accessing them. headers_response = { @@ -457,6 +477,10 @@ def perform_request( data = self.deserializer.loads( data, headers_response.get("content-type") ) + + if calculate_service_time: + data["client_metrics"] = {"service_time": service_time} + return data def close(self) -> Any: diff --git a/setup.py b/setup.py index 057fda18..d9616d83 100644 --- a/setup.py +++ b/setup.py @@ -59,6 +59,7 @@ "six", "python-dateutil", "certifi>=2022.12.07", + "Events", ] tests_require = [ "requests>=2.0.0, <3.0.0",