diff --git a/CHANGELOG.md b/CHANGELOG.md index 3ad4a299..d9e936a9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Added a log collection guide ([#579](https://github.com/opensearch-project/opensearch-py/pull/579)) - Added GHA release ([#614](https://github.com/opensearch-project/opensearch-py/pull/614)) - Incorporated API generation into CI workflow and fixed 'generate' nox session ([#660](https://github.com/opensearch-project/opensearch-py/pull/660)) +- Introduced `service time` metrics to opensearch-py client ([#689](https://github.com/opensearch-project/opensearch-py/pull/689)) - Added an automated api update bot for opensearch-py ([#664](https://github.com/opensearch-project/opensearch-py/pull/664)) ### Changed - Updated the `get_policy` API in the index_management plugin to allow the policy_id argument as optional ([#633](https://github.com/opensearch-project/opensearch-py/pull/633)) diff --git a/dev-requirements.txt b/dev-requirements.txt index c41d953c..487c1271 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -8,6 +8,7 @@ sphinx_rtd_theme jinja2 pytz deepmerge +Events # No wheels for Python 3.10 yet! numpy; python_version<"3.10" diff --git a/opensearchpy/client/client.py b/opensearchpy/client/client.py index 091bb5e9..f2b77ca4 100644 --- a/opensearchpy/client/client.py +++ b/opensearchpy/client/client.py @@ -10,6 +10,7 @@ from typing import Any, Optional, Type from opensearchpy.client.utils import _normalize_hosts +from opensearchpy.metrics import Metrics from opensearchpy.transport import Transport @@ -22,6 +23,7 @@ def __init__( self, hosts: Optional[str] = None, transport_class: Type[Transport] = Transport, + metrics: Optional[Metrics] = None, **kwargs: Any ) -> None: """ @@ -38,4 +40,6 @@ class as kwargs, or a string in the format of ``host[:port]`` which will be :class:`~opensearchpy.Transport` class and, subsequently, to the :class:`~opensearchpy.Connection` instances. """ - self.transport = transport_class(_normalize_hosts(hosts), **kwargs) + self.transport = transport_class( + _normalize_hosts(hosts), metrics=metrics, **kwargs + ) diff --git a/opensearchpy/connection/http_requests.py b/opensearchpy/connection/http_requests.py index 9bf83004..1f94d4b0 100644 --- a/opensearchpy/connection/http_requests.py +++ b/opensearchpy/connection/http_requests.py @@ -36,6 +36,8 @@ except ImportError: REQUESTS_AVAILABLE = False +from opensearchpy.metrics import Metrics + from ..compat import reraise_exceptions, string_types, urlencode from ..exceptions import ( ConnectionError, @@ -86,8 +88,10 @@ def __init__( http_compress: Any = None, opaque_id: Any = None, pool_maxsize: Any = None, + metrics: Optional[Metrics] = None, **kwargs: Any ) -> None: + self.metrics = metrics if not REQUESTS_AVAILABLE: raise ImproperlyConfigured( "Please install requests to use RequestsHttpConnection." @@ -188,7 +192,11 @@ def perform_request( # type: ignore } send_kwargs.update(settings) try: + if self.metrics is not None: + self.metrics.request_start() response = self.session.send(prepared_request, **send_kwargs) + if self.metrics is not None: + self.metrics.request_end() duration = time.time() - start raw_data = response.content.decode("utf-8", "surrogatepass") except reraise_exceptions: @@ -244,7 +252,15 @@ def perform_request( # type: ignore duration, ) - return response.status_code, response.headers, raw_data + if self.metrics is None: + return response.status_code, response.headers, raw_data + else: + return ( + response.status_code, + response.headers, + raw_data, + self.metrics.service_time, + ) @property def headers(self) -> Any: # type: ignore diff --git a/opensearchpy/connection/http_urllib3.py b/opensearchpy/connection/http_urllib3.py index ab9a1a78..c17d949d 100644 --- a/opensearchpy/connection/http_urllib3.py +++ b/opensearchpy/connection/http_urllib3.py @@ -34,6 +34,8 @@ from urllib3.exceptions import SSLError as UrllibSSLError from urllib3.util.retry import Retry +from opensearchpy.metrics import Metrics + from ..compat import reraise_exceptions, urlencode from ..exceptions import ( ConnectionError, @@ -115,8 +117,10 @@ def __init__( ssl_context: Any = None, http_compress: Any = None, opaque_id: Any = None, + metrics: Optional[Metrics] = None, **kwargs: Any ) -> None: + self.metrics = metrics # Initialize headers before calling super().__init__(). self.headers = urllib3.make_headers(keep_alive=True) @@ -267,10 +271,13 @@ def perform_request( if self.http_auth is not None: if isinstance(self.http_auth, Callable): # type: ignore request_headers.update(self.http_auth(method, full_url, body)) - + if self.metrics is not None: + self.metrics.request_start() response = self.pool.urlopen( method, url, body, retries=Retry(False), headers=request_headers, **kw ) + if self.metrics is not None: + self.metrics.request_end() duration = time.time() - start raw_data = response.data.decode("utf-8", "surrogatepass") except reraise_exceptions: @@ -304,7 +311,15 @@ def perform_request( method, full_url, url, orig_body, response.status, raw_data, duration ) - return response.status, response.headers, raw_data + if self.metrics is None: + return response.status, response.headers, raw_data + else: + return ( + response.status, + response.headers, + raw_data, + self.metrics.service_time, + ) def get_response_headers(self, response: Any) -> Any: return {header.lower(): value for header, value in response.headers.items()} diff --git a/opensearchpy/metrics.py b/opensearchpy/metrics.py new file mode 100644 index 00000000..9f87a144 --- /dev/null +++ b/opensearchpy/metrics.py @@ -0,0 +1,65 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# +# Modifications Copyright OpenSearch Contributors. See +# GitHub history for details. + +import time +from abc import ABC, abstractmethod + +from events import Events + + +class Metrics(ABC): + @abstractmethod + def request_start(self) -> None: + pass + + @abstractmethod + def request_end(self) -> None: + pass + + @property + @abstractmethod + def start_time(self) -> float: + pass + + @property + @abstractmethod + def service_time(self) -> float: + pass + + +class MetricsEvents(Metrics): + @property + def start_time(self) -> float: + return self._start_time + + @property + def service_time(self) -> float: + return self._service_time + + def __init__(self) -> None: + self.events = Events() + self._start_time = 0.0 + self._service_time = 0.0 + + # Subscribe to the request_start and request_end events + self.events.request_start += self._on_request_start + self.events.request_end += self._on_request_end + + def request_start(self) -> None: + self.events.request_start() + + def _on_request_start(self) -> None: + self._start_time = time.perf_counter() + + def request_end(self) -> None: + self.events.request_end() + + def _on_request_end(self) -> None: + self._end_time = time.perf_counter() + self._service_time = self._end_time - self._start_time diff --git a/opensearchpy/transport.py b/opensearchpy/transport.py index f582a3be..86c59470 100644 --- a/opensearchpy/transport.py +++ b/opensearchpy/transport.py @@ -29,6 +29,8 @@ from itertools import chain from typing import Any, Callable, Collection, Dict, List, Mapping, Optional, Type, Union +from opensearchpy.metrics import Metrics + from .connection import Connection, Urllib3HttpConnection from .connection_pool import ConnectionPool, DummyConnectionPool, EmptyConnectionPool from .exceptions import ( @@ -91,6 +93,7 @@ class Transport(object): last_sniff: float sniff_timeout: Optional[float] host_info_callback: Any + metrics: Optional[Metrics] def __init__( self, @@ -112,6 +115,7 @@ def __init__( retry_on_status: Collection[int] = (502, 503, 504), retry_on_timeout: bool = False, send_get_body_as: str = "GET", + metrics: Optional[Metrics] = None, **kwargs: Any ) -> None: """ @@ -153,6 +157,7 @@ def __init__( when creating and instance unless overridden by that connection's options provided as part of the hosts parameter. """ + self.metrics = metrics if connection_class is None: connection_class = self.DEFAULT_CONNECTION_CLASS @@ -242,7 +247,7 @@ def _create_connection(host: Any) -> Any: kwargs.update(host) if self.pool_maxsize and isinstance(self.pool_maxsize, int): kwargs["pool_maxsize"] = self.pool_maxsize - return self.connection_class(**kwargs) + return self.connection_class(metrics=self.metrics, **kwargs) connections = list(zip(map(_create_connection, hosts), hosts)) if len(connections) == 1: @@ -405,15 +410,31 @@ def perform_request( connection = self.get_connection() try: - status, headers_response, data = connection.perform_request( - method, - url, - params, - body, - headers=headers, - ignore=ignore, - timeout=timeout, - ) + if self.metrics: + ( + status, + headers_response, + data, + service_time, + ) = connection.perform_request( + method, + url, + params, + body, + headers=headers, + ignore=ignore, + timeout=timeout, + ) + else: + status, headers_response, data = connection.perform_request( + method, + url, + params, + body, + headers=headers, + ignore=ignore, + timeout=timeout, + ) # Lowercase all the header names for consistency in accessing them. headers_response = { @@ -457,6 +478,10 @@ def perform_request( data = self.deserializer.loads( data, headers_response.get("content-type") ) + + if self.metrics: + data["client_metrics"] = {"service_time": service_time} + return data def close(self) -> Any: diff --git a/setup.py b/setup.py index 057fda18..d9616d83 100644 --- a/setup.py +++ b/setup.py @@ -59,6 +59,7 @@ "six", "python-dateutil", "certifi>=2022.12.07", + "Events", ] tests_require = [ "requests>=2.0.0, <3.0.0",