-
Notifications
You must be signed in to change notification settings - Fork 2.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(instrumentation): add OpenTelemetry tracing and metrics with basic configurations #5175
Changes from 2 commits
a5a7f42
9e1b2d0
514792a
c3b0c37
2269b57
14cb744
f53be22
a4a4621
78efb44
7116e9f
92d3679
eb0ccd3
38cae61
45d1794
b107f80
cd17588
2e44270
a083146
30ee9e3
3998e2f
30409c2
e2ee862
a0bfaf8
60be044
9da9eaf
adb96ba
0af8ffb
a241f62
528e38b
47ed0a8
3f436da
578e882
aa5a34a
87c15f5
f7b4af4
3a2e1de
82dad9c
42d00e6
9ade3b6
4132396
4efbbd7
8e9abcb
8eed211
175a399
030b980
c686498
6d21a3a
00c6c12
92c0e1f
6e27829
366a20e
822b541
963b82d
6433930
ffadb73
3f6eeff
6b35909
2906369
f1ad7a2
d7bb8d9
5e31dca
c23f30a
bcc39a8
c540628
2ce9c67
adcb457
0ae5f99
b45de43
bbd2fb8
222cfb9
dcf7296
57be55e
7266abc
e9e78ae
3656afc
4f83c47
a9d5b1b
a706480
ef4a232
1c0aedd
c292234
01d543b
4afc51b
70146e4
7f20c06
550a975
b644004
d55d86c
132a932
bb0b003
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,58 +1 @@ | ||
from jina.serve.runtimes.gateway.grpc.gateway import GRPCGateway | ||
|
||
__all__ = ['GRPCGatewayRuntime'] | ||
|
||
|
||
class GRPCGatewayRuntime(GatewayRuntime): | ||
"""Gateway Runtime for gRPC.""" | ||
|
||
async def async_setup(self): | ||
""" | ||
The async method to setup. | ||
|
||
Create the gRPC server and expose the port for communication. | ||
""" | ||
if not self.args.proxy and os.name != 'nt': | ||
os.unsetenv('http_proxy') | ||
os.unsetenv('https_proxy') | ||
|
||
if not (is_port_free(__default_host__, self.args.port)): | ||
raise PortAlreadyUsed(f'port:{self.args.port}') | ||
|
||
self.gateway = GRPCGateway( | ||
name=self.name, | ||
grpc_server_options=self.args.grpc_server_options, | ||
grpc_tracing_server_interceptors=self.aio_tracing_server_interceptor(), | ||
port=self.args.port, | ||
ssl_keyfile=self.args.ssl_keyfile, | ||
ssl_certfile=self.args.ssl_certfile, | ||
) | ||
|
||
self.gateway.set_streamer( | ||
args=self.args, | ||
timeout_send=self.timeout_send, | ||
metrics_registry=self.metrics_registry, | ||
runtime_name=self.name, | ||
aio_tracing_client_interceptors=self.aio_tracing_client_interceptors( | ||
self.tracer | ||
), | ||
tracing_client_interceptor=self.tracing_client_interceptor( | ||
self.tracer_provider | ||
), | ||
) | ||
await self.gateway.setup_server() | ||
|
||
async def async_teardown(self): | ||
"""Close the connection pool""" | ||
# usually async_cancel should already have been called, but then its a noop | ||
# if the runtime is stopped without a sigterm (e.g. as a context manager, this can happen) | ||
await self.gateway.teardown() | ||
await self.async_cancel() | ||
|
||
async def async_cancel(self): | ||
"""The async method to stop server.""" | ||
await self.gateway.stop_server() | ||
|
||
async def async_run_forever(self): | ||
"""The async running of server.""" | ||
await self.gateway.run_server() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,71 +1 @@ | ||
import asyncio | ||
import os | ||
|
||
from jina import __default_host__ | ||
from jina.serve.runtimes.gateway import GatewayRuntime | ||
from jina.serve.runtimes.gateway.http.app import get_fastapi_app | ||
|
||
__all__ = ['HTTPGatewayRuntime'] | ||
|
||
from jina.serve.runtimes.gateway.http.gateway import HTTPGateway | ||
|
||
|
||
class HTTPGatewayRuntime(GatewayRuntime): | ||
"""Runtime for HTTP interface.""" | ||
|
||
async def async_setup(self): | ||
""" | ||
The async method setup the runtime. | ||
|
||
Setup the uvicorn server. | ||
""" | ||
self.gateway = HTTPGateway( | ||
name=self.name, | ||
port=self.args.port, | ||
title=self.args.title, | ||
description=self.args.description, | ||
no_debug_endpoints=self.args.no_debug_endpoints, | ||
no_crud_endpoints=self.args.no_crud_endpoints, | ||
expose_endpoints=self.args.expose_endpoints, | ||
expose_graphql_endpoint=self.args.expose_graphql_endpoint, | ||
cors=self.args.cors, | ||
ssl_keyfile=self.args.ssl_keyfile, | ||
ssl_certfile=self.args.ssl_certfile, | ||
uvicorn_kwargs=self.args.uvicorn_kwargs, | ||
opentelemetry_tracing=self.opentelemetry_tracing, | ||
tracer_provider=self.tracer_provider, | ||
) | ||
|
||
self.gateway.set_streamer( | ||
args=self.args, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why was this removed ? this can result in failing tests |
||
timeout_send=self.timeout_send, | ||
metrics_registry=self.metrics_registry, | ||
runtime_name=self.args.name, | ||
aio_tracing_client_interceptors=self.aio_tracing_client_interceptors( | ||
self.tracer | ||
), | ||
tracing_client_interceptor=self.tracing_client_interceptor( | ||
self.tracer_provider | ||
), | ||
) | ||
await self.gateway.setup_server() | ||
|
||
async def _wait_for_cancel(self): | ||
"""Do NOT override this method when inheriting from :class:`GatewayPod`""" | ||
# handle terminate signals | ||
while not self.is_cancel.is_set() and not self.gateway.should_exit: | ||
await asyncio.sleep(0.1) | ||
|
||
await self.async_cancel() | ||
|
||
async def async_teardown(self): | ||
"""Shutdown the server.""" | ||
await self.gateway.teardown() | ||
|
||
async def async_cancel(self): | ||
"""Stop the server.""" | ||
await self.gateway.stop_server() | ||
|
||
async def async_run_forever(self): | ||
"""Running method of the server.""" | ||
await self.gateway.run_server() | ||
from .gateway import HTTPGateway | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please use absolute import There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is not my change. Using absolute import won't work. Maybe @alaeddine-13 can explain. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I probably missed this, but I believe it's still possible, it does not produce circular imports for other gateways
JoanFM marked this conversation as resolved.
Show resolved
Hide resolved
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,57 +1,8 @@ | ||
from jina.serve.runtimes.gateway.websocket.gateway import WebSocketGateway | ||
|
||
|
||
class WebSocketGatewayRuntime(GatewayRuntime): | ||
"""Runtime for Websocket interface.""" | ||
|
||
async def async_setup(self): | ||
""" | ||
The async method setup the runtime. | ||
|
||
Setup the uvicorn server. | ||
""" | ||
import asyncio | ||
|
||
self.gateway = WebSocketGateway( | ||
name=self.name, | ||
port=self.args.port, | ||
ssl_keyfile=self.args.ssl_keyfile, | ||
ssl_certfile=self.args.ssl_certfile, | ||
uvicorn_kwargs=self.args.uvicorn_kwargs, | ||
logger=self.logger, | ||
opentelemetry_tracing=self.opentelemetry_tracing, | ||
tracer_provider=self.tracer_provider, | ||
) | ||
from jina.serve.runtimes.gateway import GatewayRuntime | ||
from jina.serve.runtimes.gateway.websocket.app import get_fastapi_app | ||
|
||
self.gateway.set_streamer( | ||
args=self.args, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same as in grpc gateway runtime |
||
timeout_send=self.timeout_send, | ||
metrics_registry=self.metrics_registry, | ||
runtime_name=self.args.name, | ||
aio_tracing_client_interceptors=self.aio_tracing_client_interceptors( | ||
self.tracer | ||
), | ||
tracing_client_interceptor=self.tracing_client_interceptor( | ||
self.tracer_provider | ||
), | ||
) | ||
await self.gateway.setup_server() | ||
__all__ = ['WebSocketGatewayRuntime'] | ||
|
||
async def _wait_for_cancel(self): | ||
"""Do NOT override this method when inheriting from :class:`GatewayPod`""" | ||
# handle terminate signals | ||
while not self.is_cancel.is_set() and not self.gateway.should_exit: | ||
await asyncio.sleep(0.1) | ||
|
||
await self.async_cancel() | ||
|
||
async def async_teardown(self): | ||
"""Shutdown the server.""" | ||
await self.gateway.teardown() | ||
|
||
async def async_cancel(self): | ||
"""Stop the server.""" | ||
await self.gateway.stop_server() | ||
|
||
async def async_run_forever(self): | ||
"""Running method of ther server.""" | ||
await self.gateway.run_server() | ||
from jina.serve.runtimes.gateway.websocket.gateway import WebSocketGateway |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove unused imports