Skip to content

Commit

Permalink
Introduced service time metrics to OpenSearch-Py client.
Browse files Browse the repository at this point in the history
Signed-off-by: saimedhi <saimedhi@amazon.com>
  • Loading branch information
saimedhi committed Mar 6, 2024
1 parent 4b69c09 commit 3dca007
Show file tree
Hide file tree
Showing 9 changed files with 223 additions and 45 deletions.
1 change: 1 addition & 0 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ sphinx_rtd_theme
jinja2
pytz
deepmerge
Events

# No wheels for Python 3.10 yet!
numpy; python_version<"3.10"
Expand Down
36 changes: 30 additions & 6 deletions opensearchpy/_async/http_aiohttp.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,14 @@
import asyncio
import os
import ssl
import time
import warnings
from typing import Any, Collection, Mapping, Optional, Union

import urllib3

from opensearchpy.metrics import TimeMetrics

from ..compat import reraise_exceptions, urlencode
from ..connection.base import Connection
from ..exceptions import (
Expand Down Expand Up @@ -130,6 +133,7 @@ def __init__(
"""

self.headers = {}
self.kwargs = kwargs

super().__init__(
host=host,
Expand Down Expand Up @@ -291,8 +295,14 @@ async def perform_request(
body = self._gzip_compress(body)
req_headers["content-encoding"] = "gzip"

start = self.loop.time()
calculate_service_time = False
if "calculate_service_time" in self.kwargs:
calculate_service_time = self.kwargs["calculate_service_time"]

time_metrics = TimeMetrics()

try:
time_metrics.events.server_request_start()
async with self.session.request(
method,
url,
Expand All @@ -306,7 +316,7 @@ async def perform_request(
raw_data = ""
else:
raw_data = await response.text()
duration = self.loop.time() - start
time_metrics.events.server_request_end()

# We want to reraise a cancellation or recursion error.
except reraise_exceptions:
Expand All @@ -317,7 +327,7 @@ async def perform_request(
url,
url_path,
orig_body,
self.loop.time() - start,
time.perf_counter() - time_metrics.start_time,
exception=e,
)
if isinstance(e, aiohttp_exceptions.ServerFingerprintMismatch):
Expand All @@ -339,7 +349,7 @@ async def perform_request(
url,
url_path,
orig_body,
duration,
time_metrics.service_time,
status_code=response.status,
response=raw_data,
)
Expand All @@ -350,10 +360,24 @@ async def perform_request(
)

self.log_request_success(
method, url, url_path, orig_body, response.status, raw_data, duration
method,
url,
url_path,
orig_body,
response.status,
raw_data,
time_metrics.service_time,
)

return response.status, response.headers, raw_data
if calculate_service_time:
return (
response.status,
response.headers,
raw_data,
time_metrics.service_time,
)
else:
return response.status, response.headers, raw_data

async def close(self) -> Any:
"""
Expand Down
40 changes: 31 additions & 9 deletions opensearchpy/_async/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,19 +382,39 @@ async def perform_request(
method, params, body
)

calculate_service_time = False
if "calculate_service_time" in self.kwargs:
calculate_service_time = self.kwargs["calculate_service_time"]

for attempt in range(self.max_retries + 1):
connection = self.get_connection()

try:
status, headers_response, data = await connection.perform_request(
method,
url,
params,
body,
headers=headers,
ignore=ignore,
timeout=timeout,
)
if calculate_service_time:
(
status,
headers_response,
data,
service_time,
) = await connection.perform_request(
method,
url,
params,
body,
headers=headers,
ignore=ignore,
timeout=timeout,
)
else:
status, headers_response, data = await connection.perform_request(
method,
url,
params,
body,
headers=headers,
ignore=ignore,
timeout=timeout,
)

# Lowercase all the header names for consistency in accessing them.
headers_response = {
Expand Down Expand Up @@ -437,6 +457,8 @@ async def perform_request(
data = self.deserializer.loads(
data, headers_response.get("content-type")
)
if calculate_service_time:
data["client_metrics"] = {"service_time": service_time}
return data

async def close(self) -> None:
Expand Down
37 changes: 30 additions & 7 deletions opensearchpy/connection/http_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@
import asyncio
import os
import ssl
import time
import warnings
from typing import Any, Collection, Mapping, Optional, Union

from opensearchpy.metrics import TimeMetrics

from .._async._extra_imports import aiohttp, aiohttp_exceptions # type: ignore
from .._async.compat import get_running_loop
from .._async.http_aiohttp import AIOHttpConnection
Expand Down Expand Up @@ -55,7 +58,7 @@ def __init__(
**kwargs: Any
) -> None:
self.headers = {}

self.kwargs = kwargs
super().__init__(
host=host,
port=port,
Expand Down Expand Up @@ -210,8 +213,14 @@ async def perform_request(
**self._http_auth(method, url, query_string, body),
}

start = self.loop.time()
calculate_service_time = False
if "calculate_service_time" in self.kwargs:
calculate_service_time = self.kwargs["calculate_service_time"]

time_metrics = TimeMetrics()

try:
time_metrics.events.server_request_start()
async with self.session.request(
method,
url,
Expand All @@ -226,7 +235,7 @@ async def perform_request(
raw_data = ""
else:
raw_data = await response.text()
duration = self.loop.time() - start
time_metrics.events.server_request_end()

# We want to reraise a cancellation or recursion error.
except reraise_exceptions:
Expand All @@ -237,7 +246,7 @@ async def perform_request(
str(url),
url_path,
orig_body,
self.loop.time() - start,
time.perf_counter() - time_metrics.start_time,
exception=e,
)
if isinstance(e, aiohttp_exceptions.ServerFingerprintMismatch):
Expand All @@ -259,17 +268,31 @@ async def perform_request(
str(url),
url_path,
orig_body,
duration,
time_metrics.service_time,
status_code=response.status,
response=raw_data,
)
self._raise_error(response.status, raw_data)

self.log_request_success(
method, str(url), url_path, orig_body, response.status, raw_data, duration
method,
str(url),
url_path,
orig_body,
response.status,
raw_data,
time_metrics.service_time,
)

return response.status, response.headers, raw_data
if calculate_service_time:
return (
response.status,
response.headers,
raw_data,
time_metrics.service_time,
)
else:
return response.status, response.headers, raw_data

async def close(self) -> Any:
"""
Expand Down
31 changes: 24 additions & 7 deletions opensearchpy/connection/http_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import warnings
from typing import Any, Collection, Mapping, Optional, Union

from opensearchpy.metrics import TimeMetrics

try:
import requests

Expand Down Expand Up @@ -88,6 +90,7 @@ def __init__(
pool_maxsize: Any = None,
**kwargs: Any
) -> None:
self.kwargs = kwargs
if not REQUESTS_AVAILABLE:
raise ImproperlyConfigured(
"Please install requests to use RequestsHttpConnection."
Expand Down Expand Up @@ -176,7 +179,6 @@ def perform_request( # type: ignore
body = self._gzip_compress(body)
headers["content-encoding"] = "gzip" # type: ignore

start = time.time()
request = requests.Request(method=method, headers=headers, url=url, data=body)
prepared_request = self.session.prepare_request(request)
settings = self.session.merge_environment_settings(
Expand All @@ -187,9 +189,17 @@ def perform_request( # type: ignore
"allow_redirects": allow_redirects,
}
send_kwargs.update(settings)

calculate_service_time = False
if "calculate_service_time" in self.kwargs:
calculate_service_time = self.kwargs["calculate_service_time"]

time_metrics = TimeMetrics()

try:
time_metrics.events.server_request_start()
response = self.session.send(prepared_request, **send_kwargs)
duration = time.time() - start
time_metrics.events.server_request_end()
raw_data = response.content.decode("utf-8", "surrogatepass")
except reraise_exceptions:
raise
Expand All @@ -199,7 +209,7 @@ def perform_request( # type: ignore
url,
prepared_request.path_url,
orig_body,
time.time() - start,
time.perf_counter() - time_metrics.start_time,
exception=e,
)
if isinstance(e, requests.exceptions.SSLError):
Expand All @@ -224,7 +234,7 @@ def perform_request( # type: ignore
url,
response.request.path_url,
orig_body,
duration,
time_metrics.service_time,
response.status_code,
raw_data,
)
Expand All @@ -241,10 +251,17 @@ def perform_request( # type: ignore
orig_body,
response.status_code,
raw_data,
duration,
time_metrics.service_time,
)

return response.status_code, response.headers, raw_data
if calculate_service_time:
return (
response.status_code,
response.headers,
raw_data,
time_metrics.service_time,
)
else:
return response.status_code, response.headers, raw_data

@property
def headers(self) -> Any: # type: ignore
Expand Down
Loading

0 comments on commit 3dca007

Please sign in to comment.