Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(instrumentation): add OpenTelemetry tracing and metrics with basic configurations #5175

Merged
merged 90 commits into from
Oct 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
90 commits
Select commit Hold shift + click to select a range
a5a7f42
feat(instrumentation): create basic tracer and meter with console exp…
Sep 15, 2022
9e1b2d0
style: fix overload and cli autocomplete
jina-bot Sep 15, 2022
514792a
feat(instrumentation): move the instrumentation package to the serve …
Sep 16, 2022
c3b0c37
feat(instrumentation): provide options to enable tracing and metrics …
Sep 16, 2022
2269b57
feat(instrumentation): add the correct grpc opentelmetery insturmenta…
Sep 19, 2022
14cb744
feat(serve): instrument grpc server and channel with interceptors
Sep 19, 2022
f53be22
style: fix overload and cli autocomplete
jina-bot Sep 19, 2022
a4a4621
feat(instrumentation): provide opentelemety context from the grpc cli…
Sep 20, 2022
78efb44
feat(instrumentation): check for opentelemetry environment variables …
Sep 20, 2022
7116e9f
feat(instrumentation): create InstrumentationMixin for server and cli…
Sep 20, 2022
92d3679
chore(instrumentation): use absolute module import
Sep 21, 2022
eb0ccd3
feat(instrumentation): trace http and websocket server and clients
Sep 21, 2022
38cae61
chore(instrumentation): update/add new opentelemetry arguments
Sep 21, 2022
45d1794
feat(instrumentation): globally disable tracing health check requests
Sep 21, 2022
b107f80
feat(instrumentation): add InstrumentationMixIn for Head and Worker r…
Sep 22, 2022
cd17588
feat(instrumentation): disable tracing of ServerReflection and endpoi…
Sep 26, 2022
2e44270
test(instrumentation): add basic tracing and metrics tests for HTTP G…
Sep 26, 2022
a083146
test(instrumentation): move test common code for tracing and metrics …
Sep 26, 2022
30ee9e3
feat(instrumentation): enable tracing of flow internal and start up r…
Sep 26, 2022
3998e2f
test(instrumentation): move test common code to new base class
Sep 26, 2022
30409c2
test(instrumentation): test grpc gateway opentelemety instrumentation
Sep 26, 2022
e2ee862
feat(instrumentation): add Jaeger export agent and required configura…
Sep 27, 2022
a0bfaf8
chore(instrumentation): remove print statement
Sep 27, 2022
60be044
test(instrumentation): document spans in the grpc and http gateway in…
Sep 27, 2022
9da9eaf
Merge branch 'master' into feat-instrumentation-5155
Sep 27, 2022
adb96ba
style: fix overload and cli autocomplete
jina-bot Sep 27, 2022
0af8ffb
chore: remove print statement
Sep 27, 2022
a241f62
test(instrumentation): add instrumentaiton tests for websocket gateway
Sep 27, 2022
528e38b
fix: import openetelmetry api globally and the other dependencies onl…
Sep 27, 2022
47ed0a8
fix: use class name as default name when creating Executor instrument…
Sep 27, 2022
3f436da
fix: provide argparse arguments to AlternativeGateway
Sep 27, 2022
578e882
style: fix overload and cli autocomplete
jina-bot Sep 27, 2022
aa5a34a
style: fix overload and cli autocomplete
Sep 28, 2022
87c15f5
Merge branch 'master' into feat-instrumentation-5155
Sep 28, 2022
f7b4af4
style: fix overload and cli autocomplete
jina-bot Sep 28, 2022
3a2e1de
style: fix overload and cli autocomplete
Sep 28, 2022
82dad9c
style: fix overload and cli autocomplete
jina-bot Sep 28, 2022
42d00e6
fix: revert changes for Gateway implementation
Sep 29, 2022
9ade3b6
Merge branch 'master' into feat-instrumentation-5155
Sep 29, 2022
4132396
feat(instrumentation): remove init method from InstrumentationMixin
Sep 29, 2022
4efbbd7
feat(instrumentation): create vendor neutral opentelemetry export arg…
Sep 29, 2022
8e9abcb
style: fix overload and cli autocomplete
Sep 29, 2022
8eed211
feat(instrumentation): inject tracing variables from AsyncLoopRuntime…
Sep 30, 2022
175a399
style: fix overload and cli autocomplete
jina-bot Sep 30, 2022
030b980
feat(instrumentation): configure a OTLP collector for exporting trace…
Sep 30, 2022
c686498
style: fix overload and cli autocomplete
jina-bot Sep 30, 2022
6d21a3a
feat(instrumentation): return None for aio server interceptors if tra…
Oct 4, 2022
00c6c12
test: fix handling of optional args
Oct 5, 2022
92c0e1f
Merge branch 'master' into feat-instrumentation-5155
Oct 5, 2022
6e27829
fix: remove print debug statement
Oct 5, 2022
366a20e
fix: fix gateway class loading
alaeddine-13 Oct 5, 2022
822b541
Merge branch 'feat-instrumentation-5155' of github.com:jina-ai/jina i…
alaeddine-13 Oct 5, 2022
963b82d
feat(instrumentation): fix BaseGateway telemetry dependency injection
Oct 5, 2022
6433930
fix: fix WebsocketGateway loading
alaeddine-13 Oct 5, 2022
ffadb73
fix(instrumentation): correctly handle default executor runtime_args
Oct 5, 2022
3f6eeff
test(instrumentation): add integration tests for grpc, http and webso…
Oct 5, 2022
6b35909
test(instrumentation): parameterize instrumentation tests
Oct 5, 2022
2906369
test(instrumentation): remove outdated tests replaced by parametrized…
Oct 6, 2022
f1ad7a2
fix(instrumentation): fix executor instrumentation setup
Oct 6, 2022
d7bb8d9
fix(instrumentation): force spawn process when running flows in param…
Oct 6, 2022
5e31dca
feat(instrumentation): omit opentelemetry from cli args
Oct 6, 2022
c23f30a
style: fix overload and cli autocomplete
jina-bot Oct 6, 2022
bcc39a8
test: small test refactoring
JoanFM Oct 6, 2022
c540628
Merge branch 'master' into feat-instrumentation-5155
Oct 6, 2022
2ce9c67
style: fix overload and cli autocomplete
Oct 6, 2022
adcb457
style: fix overload and cli autocomplete
jina-bot Oct 6, 2022
0ae5f99
Merge branch 'master' into feat-instrumentation-5155
Oct 6, 2022
b45de43
test: dont set multiprocessing start method to spawn
Oct 6, 2022
bbd2fb8
fix: hide opentelemetry imports
Oct 6, 2022
222cfb9
Merge branch 'master' into feat-instrumentation-5155
JoanFM Oct 6, 2022
dcf7296
fix(runtimes): shutdown instrumentation exporters during teardown
Oct 7, 2022
57be55e
test: spawn processes by default in tests
Oct 7, 2022
7266abc
Merge branch 'feat-instrumentation-5155' of github.com:jina-ai/jina i…
Oct 7, 2022
e9e78ae
fix: provide client and server interceptors only when tracing is ena…
Oct 7, 2022
3656afc
Merge branch 'master' into feat-instrumentation-5155
Oct 7, 2022
4f83c47
fix(serve): correctly handle default instrumentation runtime_args
Oct 7, 2022
a9d5b1b
chore: hide opentelemetry imports under TYPE_CHECKING
Oct 7, 2022
a706480
test: avoid using spawn
JoanFM Oct 7, 2022
ef4a232
fix: add explicit type info and hide imports
Oct 7, 2022
1c0aedd
fix(executors): handle optional runtime_args correctly
Oct 7, 2022
c292234
chore: rename otel_context to tracing_context
Oct 7, 2022
01d543b
feat: use None instead of NoOp tracer and meter implementations
Oct 10, 2022
4afc51b
fix: remove unused import
Oct 10, 2022
70146e4
feat: add default tracing span for DataRequestHandler handle invocation
Oct 10, 2022
7f20c06
test: add test case to verify exception recording in a span
Oct 10, 2022
550a975
fix: use continue_on_error instead of try-except-pass
Oct 10, 2022
b644004
Merge branch 'master' into feat-instrumentation-5155
girishc13 Oct 10, 2022
d55d86c
chore: rename method name to match returning a list
Oct 11, 2022
132a932
fix: rename span_exporter args to traces_exporter
Oct 11, 2022
bb0b003
style: fix overload and cli autocomplete
jina-bot Oct 11, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/fundamentals/flow/executor-args.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@
| `port_monitoring` | The port on which the prometheus server is exposed, default is a random port between [49152, 65535] | `string` | `random in [49152, 65535]` |
| `retries` | Number of retries per gRPC call. If <0 it defaults to max(3, num_replicas) | `number` | `-1` |
| `floating` | If set, the current Pod/Deployment can not be further chained, and the next `.add()` will chain after the last Pod/Deployment not this current one. | `boolean` | `False` |
| `tracing` | If set, the sdk implementation of the OpenTelemetry tracer will be available and will be enabled for automatic tracing of requests and customer span creation. Otherwise a no-op implementation will be provided. | `boolean` | `False` |
| `traces_exporter_host` | If tracing is enabled, this hostname will be used to configure the trace exporter agent. | `string` | `None` |
| `traces_exporter_port` | If tracing is enabled, this port will be used to configure the trace exporter agent. | `number` | `None` |
| `metrics` | If set, the sdk implementation of the OpenTelemetry metrics will be available for default monitoring and custom measurements. Otherwise a no-op implementation will be provided. | `boolean` | `False` |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the sentence. Isn't it going to overlap with the monitoring ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, my intention is to use the same terms as OpenTelemetry. If people read the OpenTelemetry documentation then the terms are aligned.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will be renamed to traces_exporter_host?

| `metrics_exporter_host` | If tracing is enabled, this hostname will be used to configure the metrics exporter agent. | `string` | `None` |
| `metrics_exporter_port` | If tracing is enabled, this port will be used to configure the metrics exporter agent. | `number` | `None` |
| `install_requirements` | If set, install `requirements.txt` in the Hub Executor bundle to local | `boolean` | `False` |
| `force_update` | If set, always pull the latest Hub Executor bundle even it exists on local | `boolean` | `False` |
| `compression` | The compression mechanism used when sending requests from the Head to the WorkerRuntimes. For more details, check https://grpc.github.io/grpc/python/grpc.html#compression. | `string` | `None` |
Expand Down
8 changes: 7 additions & 1 deletion docs/fundamentals/flow/gateway-args.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,10 @@
| `monitoring` | If set, spawn an http server with a prometheus endpoint to expose metrics | `boolean` | `False` |
| `port_monitoring` | The port on which the prometheus server is exposed, default is a random port between [49152, 65535] | `string` | `random in [49152, 65535]` |
| `retries` | Number of retries per gRPC call. If <0 it defaults to max(3, num_replicas) | `number` | `-1` |
| `floating` | If set, the current Pod/Deployment can not be further chained, and the next `.add()` will chain after the last Pod/Deployment not this current one. | `boolean` | `False` |
| `floating` | If set, the current Pod/Deployment can not be further chained, and the next `.add()` will chain after the last Pod/Deployment not this current one. | `boolean` | `False` |
| `tracing` | If set, the sdk implementation of the OpenTelemetry tracer will be available and will be enabled for automatic tracing of requests and customer span creation. Otherwise a no-op implementation will be provided. | `boolean` | `False` |
| `traces_exporter_host` | If tracing is enabled, this hostname will be used to configure the trace exporter agent. | `string` | `None` |
| `traces_exporter_port` | If tracing is enabled, this port will be used to configure the trace exporter agent. | `number` | `None` |
| `metrics` | If set, the sdk implementation of the OpenTelemetry metrics will be available for default monitoring and custom measurements. Otherwise a no-op implementation will be provided. | `boolean` | `False` |
| `metrics_exporter_host` | If tracing is enabled, this hostname will be used to configure the metrics exporter agent. | `string` | `None` |
| `metrics_exporter_port` | If tracing is enabled, this port will be used to configure the metrics exporter agent. | `number` | `None` |
10 changes: 10 additions & 0 deletions extra-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,17 @@ packaging>=20.0: core
docarray>=0.16.4: core
jina-hubble-sdk>=0.19.0: core
jcloud>=0.0.35: core
opentelemetry-api>=1.12.0: core
opentelemetry-instrumentation-grpc>=0.33b0: core
uvloop: perf,standard,devel
prometheus_client: perf,standard,devel
opentelemetry-sdk>=1.12.0: perf,standard,devel
opentelemetry-exporter-otlp>=1.12.0: perf,standard,devel
opentelemetry-exporter-prometheus>=1.12.0rc1: perf,standard,devel
opentelemetry-semantic-conventions>=0.33b0: perf,standard,devel
opentelemetry-instrumentation-aiohttp-client>=0.33b0: perf,standard,devel
opentelemetry-instrumentation-fastapi>=0.33b0: perf,standard,devel
opentelemetry-exporter-otlp-proto-grpc>=1.13.0: perf,standrad,devel
fastapi>=0.76.0: standard,devel
uvicorn[standard]: standard,devel
docarray[common]>=0.16.3: standard,devel
Expand Down Expand Up @@ -77,3 +86,4 @@ bs4: cicd
jsonschema: cicd
portforward>=0.2.4: cicd
tensorflow>=2.0: cicd
opentelemetry-test-utils>=0.33b0: test
18 changes: 18 additions & 0 deletions jina/clients/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,16 @@ def Client(
*,
asyncio: Optional[bool] = False,
host: Optional[str] = '0.0.0.0',
metrics: Optional[bool] = False,
metrics_exporter_host: Optional[str] = None,
metrics_exporter_port: Optional[int] = None,
port: Optional[int] = None,
protocol: Optional[str] = 'GRPC',
proxy: Optional[bool] = False,
tls: Optional[bool] = False,
traces_exporter_host: Optional[str] = None,
traces_exporter_port: Optional[int] = None,
tracing: Optional[bool] = False,
**kwargs
) -> Union[
'AsyncWebSocketClient',
Expand All @@ -37,10 +43,16 @@ def Client(

:param asyncio: If set, then the input and output of this Client work in an asynchronous manner.
:param host: The host address of the runtime, by default it is 0.0.0.0. In the case of an external Executor (`--external` or `external=True`) this can be a list of hosts, separated by commas. Then, every resulting address will be considered as one replica of the Executor.
:param metrics: If set, the sdk implementation of the OpenTelemetry metrics will be available for default monitoring and custom measurements. Otherwise a no-op implementation will be provided.
:param metrics_exporter_host: If tracing is enabled, this hostname will be used to configure the metrics exporter agent.
:param metrics_exporter_port: If tracing is enabled, this port will be used to configure the metrics exporter agent.
:param port: The port of the Gateway, which the client should connect to.
:param protocol: Communication protocol between server and client.
:param proxy: If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy
:param tls: If set, connect to gateway using tls encryption
:param traces_exporter_host: If tracing is enabled, this hostname will be used to configure the trace exporter agent.
:param traces_exporter_port: If tracing is enabled, this port will be used to configure the trace exporter agent.
:param tracing: If set, the sdk implementation of the OpenTelemetry tracer will be available and will be enabled for automatic tracing of requests and customer span creation. Otherwise a no-op implementation will be provided.
:return: the new Client object

.. # noqa: DAR202
Expand Down Expand Up @@ -81,10 +93,16 @@ def Client(

:param asyncio: If set, then the input and output of this Client work in an asynchronous manner.
:param host: The host address of the runtime, by default it is 0.0.0.0. In the case of an external Executor (`--external` or `external=True`) this can be a list of hosts, separated by commas. Then, every resulting address will be considered as one replica of the Executor.
:param metrics: If set, the sdk implementation of the OpenTelemetry metrics will be available for default monitoring and custom measurements. Otherwise a no-op implementation will be provided.
:param metrics_exporter_host: If tracing is enabled, this hostname will be used to configure the metrics exporter agent.
:param metrics_exporter_port: If tracing is enabled, this port will be used to configure the metrics exporter agent.
:param port: The port of the Gateway, which the client should connect to.
:param protocol: Communication protocol between server and client.
:param proxy: If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy
:param tls: If set, connect to gateway using tls encryption
:param traces_exporter_host: If tracing is enabled, this hostname will be used to configure the trace exporter agent.
:param traces_exporter_port: If tracing is enabled, this port will be used to configure the trace exporter agent.
:param tracing: If set, the sdk implementation of the OpenTelemetry tracer will be available and will be enabled for automatic tracing of requests and customer span creation. Otherwise a no-op implementation will be provided.
:return: the new Client object

.. # noqa: DAR102
Expand Down
14 changes: 13 additions & 1 deletion jina/clients/base/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@

InputType = Union[GeneratorSourceType, Callable[..., GeneratorSourceType]]
CallbackFnType = Optional[Callable[[Response], None]]
from jina.serve.instrumentation import InstrumentationMixin


class BaseClient(ABC):
class BaseClient(InstrumentationMixin, ABC):
"""A base client for connecting to the Flow Gateway.

:param args: the Namespace from argparse
Expand All @@ -46,6 +47,17 @@ def __init__(
os.unsetenv('http_proxy')
os.unsetenv('https_proxy')
self._inputs = None
self._setup_instrumentation(
girishc13 marked this conversation as resolved.
Show resolved Hide resolved
name=self.args.name
if hasattr(self.args, 'name')
else self.__class__.__name__,
tracing=self.args.tracing,
traces_exporter_host=self.args.traces_exporter_host,
traces_exporter_port=self.args.traces_exporter_port,
metrics=self.args.metrics,
metrics_exporter_host=self.args.metrics_exporter_host,
metrics_exporter_port=self.args.metrics_exporter_port,
)
send_telemetry_event(event='start', obj=self)

@staticmethod
Expand Down
45 changes: 28 additions & 17 deletions jina/clients/base/grpc.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import asyncio
import json
from typing import TYPE_CHECKING, Optional

import grpc
import json

from jina.clients.base import BaseClient
from jina.clients.helper import callback_exec
Expand Down Expand Up @@ -82,19 +82,27 @@ async def _get_results(
# while loop with retries, check in which state the `iterator` remains after failure
options = GrpcConnectionPool.get_default_grpc_options()
if max_attempts > 1:
service_config_json = json.dumps({
"methodConfig": [{
# To apply retry to all methods, put [{}] in the "name" field
"name": [{}],
"retryPolicy": {
"maxAttempts": max_attempts,
"initialBackoff": f"{initial_backoff}s",
"maxBackoff": f"{max_backoff}s",
"backoffMultiplier": {backoff_multiplier},
"retryableStatusCodes": ["UNAVAILABLE", "DEADLINE_EXCEEDED", "INTERNAL"],
},
}]
})
service_config_json = json.dumps(
{
"methodConfig": [
{
# To apply retry to all methods, put [{}] in the "name" field
"name": [{}],
"retryPolicy": {
"maxAttempts": max_attempts,
"initialBackoff": f"{initial_backoff}s",
"maxBackoff": f"{max_backoff}s",
"backoffMultiplier": {backoff_multiplier},
"retryableStatusCodes": [
"UNAVAILABLE",
"DEADLINE_EXCEEDED",
"INTERNAL",
],
},
}
]
}
)
# NOTE: the retry feature will be enabled by default >=v1.40.0
options.append(("grpc.enable_retries", 1))
options.append(("grpc.service_config", service_config_json))
Expand All @@ -104,6 +112,7 @@ async def _get_results(
options=options,
asyncio=True,
tls=self.args.tls,
aio_tracing_client_interceptors=self.aio_tracing_client_interceptors(),
) as channel:
stub = jina_pb2_grpc.JinaRPCStub(channel)
self.logger.debug(f'connected to {self.args.host}:{self.args.port}')
Expand Down Expand Up @@ -146,11 +155,13 @@ async def _get_results(
)
raise ConnectionError(my_details)
elif my_code == grpc.StatusCode.INTERNAL:
self.logger.error(f'{msg}\ninternal error on the server side')
self.logger.error(
f'{msg}\ninternal error on the server side'
)
raise err
elif (
my_code == grpc.StatusCode.UNKNOWN
and 'asyncio.exceptions.TimeoutError' in my_details
my_code == grpc.StatusCode.UNKNOWN
and 'asyncio.exceptions.TimeoutError' in my_details
):
raise BadClientInput(
f'{msg}\n'
Expand Down
51 changes: 40 additions & 11 deletions jina/clients/base/helper.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import random
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Optional

from aiohttp import WSMsgType

Expand All @@ -12,19 +12,25 @@
from jina.types.request.status import StatusMessage

if TYPE_CHECKING:
from opentelemetry import trace

from jina.logging.logger import JinaLogger


class AioHttpClientlet(ABC):
"""aiohttp session manager"""

def __init__(self, url: str,
logger: 'JinaLogger',
max_attempts: int = 1,
initial_backoff: float = 0.5,
max_backoff: float = 0.1,
backoff_multiplier: float = 1.5,
**kwargs) -> None:
def __init__(
self,
url: str,
logger: 'JinaLogger',
max_attempts: int = 1,
initial_backoff: float = 0.5,
max_backoff: float = 0.1,
backoff_multiplier: float = 1.5,
tracer_provider: Optional['trace.TraceProvider'] = None,
**kwargs,
) -> None:
"""HTTP Client to be used with the streamer

:param url: url to send http/websocket request to
Expand All @@ -33,12 +39,19 @@ def __init__(self, url: str,
:param initial_backoff: The first retry will happen with a delay of random(0, initial_backoff)
:param max_backoff: The maximum accepted backoff after the exponential incremental delay
:param backoff_multiplier: The n-th attempt will occur at random(0, min(initialBackoff*backoffMultiplier**(n-1), maxBackoff))
:param tracer_provider: Optional tracer_provider that will be used to configure aiohttp tracing.
:param kwargs: kwargs which will be forwarded to the `aiohttp.Session` instance. Used to pass headers to requests
"""
self.url = url
self.logger = logger
self.msg_recv = 0
self.msg_sent = 0
if tracer_provider:
from opentelemetry.instrumentation.aiohttp_client import create_trace_config

self._trace_config = [create_trace_config(tracer_provider=tracer_provider)]
else:
self._trace_config = None
self.session = None
self._session_kwargs = {}
if kwargs.get('headers', None):
Expand Down Expand Up @@ -90,7 +103,9 @@ async def start(self):
with ImportExtensions(required=True):
import aiohttp

self.session = aiohttp.ClientSession(**self._session_kwargs)
self.session = aiohttp.ClientSession(
**self._session_kwargs, trace_configs=self._trace_config
)
await self.session.__aenter__()
return self

Expand Down Expand Up @@ -125,7 +140,14 @@ async def send_message(self, request: 'Request'):
if retry == self.max_attempts:
raise
else:
wait_time = random.uniform(0, min(self.initial_backoff*self.backoff_multiplier**(retry-1), self.max_backoff))
wait_time = random.uniform(
0,
min(
self.initial_backoff
* self.backoff_multiplier ** (retry - 1),
self.max_backoff,
),
)
await asyncio.sleep(wait_time)

async def send_dry_run(self, **kwargs):
Expand Down Expand Up @@ -194,7 +216,14 @@ async def send_message(self, request: 'Request'):
self.logger.critical(f'server connection closed already!')
raise
else:
wait_time = random.uniform(0, min(self.initial_backoff*self.backoff_multiplier**(retry-1), self.max_backoff))
wait_time = random.uniform(
0,
min(
self.initial_backoff
* self.backoff_multiplier ** (retry - 1),
self.max_backoff,
),
)
await asyncio.sleep(wait_time)

async def send_dry_run(self, **kwargs):
Expand Down
Loading