From 7b74acf4388d32cadb2b4926ccded4e4eae61b8e Mon Sep 17 00:00:00 2001 From: Yash Gorana Date: Thu, 12 Sep 2024 12:00:10 +0530 Subject: [PATCH 01/13] separate signoz cluster --- tox.ini | 50 +++++++++++++++++++++++++++++++++++--------------- 1 file changed, 35 insertions(+), 15 deletions(-) diff --git a/tox.ini b/tox.ini index 126f8f2549b..cf1dad95035 100644 --- a/tox.ini +++ b/tox.ini @@ -1028,22 +1028,39 @@ commands = ; restarts coredns bash -c 'kubectl delete pod -n kube-system -l k8s-app=kube-dns --context k3d-${CLUSTER_NAME}' -[testenv:dev.k8s.install.signoz] -description = Install Signoz on local Kubernetes cluster +[testenv:dev.k8s.start.signoz] +description = Start local Kubernetes registry with Signoz installed changedir = {toxinidir} -passenv=HOME,USER,CLUSTER_NAME -allowlist_externals= +passenv=HOME,USER +allowlist_externals = bash -setenv= - CLUSTER_NAME = {env:CLUSTER_NAME:test-datasite-1} - SIGNOZ_PORT = {env:SIGNOZ_PORT:3301} -commands= - bash -c 'if [ "{posargs}" ]; then kubectl config use-context {posargs}; fi' - bash -c 'helm repo add signoz https://charts.signoz.io && helm repo update' - bash -c 'helm install syft signoz/signoz --namespace platform --create-namespace || true' - # bash -c 'k3d cluster edit ${CLUSTER_NAME} --port-add "${SIGNOZ_PORT}:3301@loadbalancer"' - ; bash packages/grid/scripts/wait_for.sh service syft-signoz-frontend --context k3d-{env:CLUSTER_NAME} --namespace platform - + tox + helm + curl +commands = + bash -c 'k3d cluster create signoz \ + -p "3301:3301@loadbalancer" \ + -p "4317:4317@loadbalancer" \ + --k3s-arg "--disable=metrics-server@server:*"' + + helm install signoz signoz \ + --repo https://charts.signoz.io \ + --namespace platform \ + --create-namespace \ + --version 0.52.0 \ + --set frontend.service.type=LoadBalancer \ + --set otelCollector.service.type=LoadBalancer \ + --set otelCollectorMetrics.service.type=LoadBalancer + + ; wait for signoz frontend + bash -c 'echo Waiting for signoz-frontend; \ + WAIT_TIME=5 source packages/grid/scripts/wait_for.sh service signoz-frontend --namespace platform --context k3d-signoz > /dev/null' + + ; setup defailt account + curl --retry 5 --retry-all-errors -X POST \ + -H "Content-Type: application/json" \ + --data '{"email":"admin@localhost","name":"admin","orgName":"openmined","password":"adminadmin"}' \ + http://localhost:3301/api/v1/register [testenv:dev.k8s.start] description = Start local Kubernetes registry & cluster with k3d @@ -1066,7 +1083,7 @@ commands = -p "${CLUSTER_HTTP_PORT}:80@loadbalancer" \ --registry-use k3d-registry.localhost:5800 {posargs} && \ kubectl --context k3d-${CLUSTER_NAME} create namespace syft || true' - # -p "${SIGNOZ_PORT}:3301@loadbalancer" \ + ; patch coredns tox -e dev.k8s.patch.coredns @@ -1274,6 +1291,9 @@ commands = ; destroy cluster tox -e dev.k8s.destroy + ; destroy cluster + bash -c 'CLUSTER_NAME=signoz tox -e dev.k8s.destroy' + ; destroy registry bash -c 'k3d registry delete registry.localhost || true' bash -c 'docker volume rm k3d-registry-vol --force || true' From 832a2d79a5378ed2e2dd8fc0dad61117fde1add6 Mon Sep 17 00:00:00 2001 From: Yash Gorana Date: Thu, 12 Sep 2024 17:36:24 +0530 Subject: [PATCH 02/13] install k8s-infra collector on deployments --- tox.ini | 59 +++++++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 49 insertions(+), 10 deletions(-) diff --git a/tox.ini b/tox.ini index cf1dad95035..ef2b038a871 100644 --- a/tox.ini +++ b/tox.ini @@ -1028,21 +1028,18 @@ commands = ; restarts coredns bash -c 'kubectl delete pod -n kube-system -l k8s-app=kube-dns --context k3d-${CLUSTER_NAME}' -[testenv:dev.k8s.start.signoz] -description = Start local Kubernetes registry with Signoz installed +[testenv:dev.k8s.add.signoz] +description = Install Signoz on Kubernetes cluster changedir = {toxinidir} passenv=HOME,USER +setenv = + CLUSTER_NAME = {env:CLUSTER_NAME:syft-dev} allowlist_externals = bash tox helm curl commands = - bash -c 'k3d cluster create signoz \ - -p "3301:3301@loadbalancer" \ - -p "4317:4317@loadbalancer" \ - --k3s-arg "--disable=metrics-server@server:*"' - helm install signoz signoz \ --repo https://charts.signoz.io \ --namespace platform \ @@ -1056,12 +1053,35 @@ commands = bash -c 'echo Waiting for signoz-frontend; \ WAIT_TIME=5 source packages/grid/scripts/wait_for.sh service signoz-frontend --namespace platform --context k3d-signoz > /dev/null' - ; setup defailt account + ; setup dafault account curl --retry 5 --retry-all-errors -X POST \ -H "Content-Type: application/json" \ - --data '{"email":"admin@localhost","name":"admin","orgName":"openmined","password":"adminadmin"}' \ + --data '{"email":"admin@localhost","name":"admin","orgName":"openmined","password":"password"}' \ http://localhost:3301/api/v1/register + bash -c "printf 'Signoz is running on http://localhost:3301\nEmail: \033[1;36madmin@localhost\033[0m\nPassword: \033[1;36mpassword\033[0m\n'" + + +[testenv:dev.k8s.add.collector] +description = Install signoz/k8s-infra on Kubernetes cluster +changedir = {toxinidir} +passenv=HOME,USER,CLUSTER_NAME +setenv = + SIGNOZ_HOST=host.k3d.internal + CLUSTER_NAME = {env:CLUSTER_NAME:syft-dev} +allowlist_externals = + helm +commands = + helm install k8s-infra k8s-infra \ + --repo https://charts.signoz.io \ + --kube-context k3d-{env:CLUSTER_NAME} \ + --set global.deploymentEnvironment=local-dev \ + --set clusterName={env:CLUSTER_NAME} \ + --set otelCollectorEndpoint=http://{env:SIGNOZ_HOST}:4317 \ + --set otelInsecure=true \ + --set presets.otlpExporter.enabled=true \ + --set presets.loggingExporter.enabled=true + [testenv:dev.k8s.start] description = Start local Kubernetes registry & cluster with k3d changedir = {toxinidir} @@ -1087,9 +1107,28 @@ commands = ; patch coredns tox -e dev.k8s.patch.coredns + ; add signoz/collector + tox -e dev.k8s.add.collector + ; dump cluster info tox -e dev.k8s.info +[testenv:dev.k8s.start.signoz] +description = Start local Kubernetes registry with Signoz installed +changedir = {toxinidir} +passenv=HOME,USER +allowlist_externals = + bash + tox +commands = + bash -c 'k3d cluster create signoz \ + -p "3301:3301@loadbalancer" \ + -p "4317:4317@loadbalancer" \ + --k3s-arg "--disable=metrics-server@server:*"' + + ; add signoz + tox -e dev.k8s.add.signoz + [testenv:dev.k8s.deploy] description = Deploy Syft to a local Kubernetes cluster with Devspace changedir = {toxinidir}/packages/grid @@ -1149,7 +1188,7 @@ allowlist_externals = bash commands = bash -c 'devspace purge --force-purge --kube-context k3d-${CLUSTER_NAME} --no-warn --namespace syft; sleep 3' - bash -c 'devspace cleanup images --kube-context k3d-${CLUSTER_NAME} --no-warn --namespace syft --var CONTAINER_REGISTRY=k3d-registry.localhost:5800 || true' + ; bash -c 'devspace cleanup images --kube-context k3d-${CLUSTER_NAME} --no-warn --namespace syft --var CONTAINER_REGISTRY=k3d-registry.localhost:5800 || true' bash -c 'kubectl --context k3d-${CLUSTER_NAME} delete namespace syft --now=true || true' [testenv:dev.k8s.render] From 7efe7075316edf461083f42adeaef8822587c5e6 Mon Sep 17 00:00:00 2001 From: Yash Gorana Date: Thu, 12 Sep 2024 17:55:32 +0530 Subject: [PATCH 03/13] fix otlpEndpoint --- packages/grid/devspace.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/grid/devspace.yaml b/packages/grid/devspace.yaml index ae6245df2c1..0e60e893a4b 100644 --- a/packages/grid/devspace.yaml +++ b/packages/grid/devspace.yaml @@ -132,7 +132,7 @@ profiles: value: tracing: enabled: true - otlpEndpoint: "http://syft-signoz-otel-collector.platform:4317" + otlpEndpoint: "http://host.k3d.internal:4317" otelProtocol: "grpc" - name: bigquery-scenario-tests From bf761c5a51c220f1cc6e0e97cadb9fd4fadacf77 Mon Sep 17 00:00:00 2001 From: Yash Gorana Date: Mon, 16 Sep 2024 13:10:09 +0530 Subject: [PATCH 04/13] add otel attributes --- packages/grid/backend/backend.dockerfile | 2 +- packages/grid/backend/grid/start.sh | 31 ++++++++++++++++-------- packages/syft/src/syft/util/telemetry.py | 6 ++--- 3 files changed, 25 insertions(+), 14 deletions(-) diff --git a/packages/grid/backend/backend.dockerfile b/packages/grid/backend/backend.dockerfile index c51ba31c8fd..6511b42fdbc 100644 --- a/packages/grid/backend/backend.dockerfile +++ b/packages/grid/backend/backend.dockerfile @@ -75,7 +75,7 @@ ENV \ APPDIR="/root/app" \ SERVER_NAME="default_server_name" \ SERVER_TYPE="datasite" \ - SERVICE_NAME="backend" \ + SERVER_SIDE_TYPE="high" \ RELEASE="production" \ DEV_MODE="False" \ DEBUGGER_ENABLED="False" \ diff --git a/packages/grid/backend/grid/start.sh b/packages/grid/backend/grid/start.sh index ace8d76e455..3c12891a764 100755 --- a/packages/grid/backend/grid/start.sh +++ b/packages/grid/backend/grid/start.sh @@ -7,11 +7,14 @@ APP_MODULE=grid.main:app LOG_LEVEL=${LOG_LEVEL:-info} HOST=${HOST:-0.0.0.0} PORT=${PORT:-80} -SERVER_TYPE=${SERVER_TYPE:-datasite} APPDIR=${APPDIR:-$HOME/app} RELOAD="" ROOT_PROC="" +export CREDENTIALS_PATH=${CREDENTIALS_PATH:-$HOME/data/creds/credentials.json} +export SERVER_PRIVATE_KEY=$(python $APPDIR/grid/bootstrap.py --private_key) +export SERVER_UID=$(python $APPDIR/grid/bootstrap.py --uid) + if [[ ${DEV_MODE} == "True" ]]; then echo "Hot-reload Enabled" @@ -28,20 +31,28 @@ fi if [[ ${TRACING} == "true" ]]; then - # TODOs: - # ? Kubernetes OTel operator is recommended by signoz - export OTEL_PYTHON_LOG_CORRELATION=${OTEL_PYTHON_LOG_CORRELATION:-true} - echo "OpenTelemetry Enabled. Endpoint=$OTEL_EXPORTER_OTLP_ENDPOINT Protocol=$OTEL_EXPORTER_OTLP_PROTOCOL" + # TODO: Polish these values up + DEPLOYMENT_ENV="$SERVER_TYPE-$SERVER_SIDE_TYPE" + RESOURCE_ATTRS=( + "deployment.environment=$DEPLOYMENT_ENV" + "service.namespace=$DEPLOYMENT_ENV" + "service.instance.id=$SERVER_UID" + "k8s.pod.name=${K8S_POD_NAME:-"none"}" + "k8s.namespace.name=${K8S_NAMESPACE:"none"}" + ) + + # environ is always prefixed with the server type + export OTEL_SERVICE_NAME="${DEPLOYMENT_ENV}-${OTEL_SERVICE_NAME:-"backend"}" + export OTEL_RESOURCE_ATTRIBUTES=$(IFS=, ; echo "${RESOURCE_ATTRS[*]}") + + echo "OpenTelemetry Enabled." + env | grep OTEL_ else echo "OpenTelemetry Disabled" fi -export CREDENTIALS_PATH=${CREDENTIALS_PATH:-$HOME/data/creds/credentials.json} -export SERVER_PRIVATE_KEY=$(python $APPDIR/grid/bootstrap.py --private_key) -export SERVER_UID=$(python $APPDIR/grid/bootstrap.py --uid) -export SERVER_TYPE=$SERVER_TYPE - echo "SERVER_UID=$SERVER_UID" echo "SERVER_TYPE=$SERVER_TYPE" +echo "SERVER_SIDE_TYPE=$SERVER_SIDE_TYPE" exec $ROOT_PROC uvicorn $RELOAD --host $HOST --port $PORT --log-config=$APPDIR/grid/logging.yaml "$APP_MODULE" diff --git a/packages/syft/src/syft/util/telemetry.py b/packages/syft/src/syft/util/telemetry.py index ef376d78fc0..a3af3bb692c 100644 --- a/packages/syft/src/syft/util/telemetry.py +++ b/packages/syft/src/syft/util/telemetry.py @@ -36,7 +36,7 @@ def noop_wrapper(__func_or_class: T) -> T: from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import ( OTLPSpanExporter, ) - from opentelemetry.sdk.resources import Resource + from opentelemetry.sdk.resources import OTELResourceDetector from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor @@ -44,8 +44,8 @@ def noop_wrapper(__func_or_class: T) -> T: from .trace_decorator import instrument as _instrument # create a provider - service_name = os.environ.get("OTEL_SERVICE_NAME", "syft-backend") - resource = Resource.create({"service.name": service_name}) + resource = OTELResourceDetector().detect() + logger.info(f"OTEL Resource: {resource}") provider = TracerProvider(resource=resource) # create a span processor From 9d175497789b30157c2e2d7a9f782698c0152473 Mon Sep 17 00:00:00 2001 From: Yash Gorana Date: Mon, 16 Sep 2024 13:37:48 +0530 Subject: [PATCH 05/13] add syft + proc otel attrs --- packages/grid/backend/grid/start.sh | 3 +++ packages/syft/src/syft/util/telemetry.py | 13 ++++++++++--- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/packages/grid/backend/grid/start.sh b/packages/grid/backend/grid/start.sh index 3c12891a764..79ca493d4c2 100755 --- a/packages/grid/backend/grid/start.sh +++ b/packages/grid/backend/grid/start.sh @@ -39,6 +39,9 @@ then "service.instance.id=$SERVER_UID" "k8s.pod.name=${K8S_POD_NAME:-"none"}" "k8s.namespace.name=${K8S_NAMESPACE:"none"}" + "syft.server.uid=$SERVER_UID" + "syft.server.type=$SERVER_TYPE" + "syft.server.side.type=$SERVER_SIDE_TYPE" ) # environ is always prefixed with the server type diff --git a/packages/syft/src/syft/util/telemetry.py b/packages/syft/src/syft/util/telemetry.py index a3af3bb692c..7cbe5a5c78c 100644 --- a/packages/syft/src/syft/util/telemetry.py +++ b/packages/syft/src/syft/util/telemetry.py @@ -6,6 +6,7 @@ from typing import TypeVar # relative +from .. import __version__ from .util import str_to_bool __all__ = ["TRACING_ENABLED", "instrument"] @@ -37,15 +38,21 @@ def noop_wrapper(__func_or_class: T) -> T: OTLPSpanExporter, ) from opentelemetry.sdk.resources import OTELResourceDetector + from opentelemetry.sdk.resources import ProcessResourceDetector + from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor # relative from .trace_decorator import instrument as _instrument - # create a provider - resource = OTELResourceDetector().detect() - logger.info(f"OTEL Resource: {resource}") + # create a resource + resource = Resource({"syft.version": __version__}) + resource = resource.merge(OTELResourceDetector().detect()) + resource = resource.merge(ProcessResourceDetector()) + logger.info(f"OTEL Resource: {resource.__dict__}") + + # create a trace provider from the resource provider = TracerProvider(resource=resource) # create a span processor From a0ac3cad834b2b376667f6857dadc0d7bc9d17f4 Mon Sep 17 00:00:00 2001 From: Yash Gorana Date: Mon, 16 Sep 2024 13:46:22 +0530 Subject: [PATCH 06/13] add thread span attrs --- packages/syft/src/syft/util/telemetry.py | 2 +- packages/syft/src/syft/util/trace_decorator.py | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/packages/syft/src/syft/util/telemetry.py b/packages/syft/src/syft/util/telemetry.py index 7cbe5a5c78c..b609a0f023b 100644 --- a/packages/syft/src/syft/util/telemetry.py +++ b/packages/syft/src/syft/util/telemetry.py @@ -49,7 +49,7 @@ def noop_wrapper(__func_or_class: T) -> T: # create a resource resource = Resource({"syft.version": __version__}) resource = resource.merge(OTELResourceDetector().detect()) - resource = resource.merge(ProcessResourceDetector()) + resource = resource.merge(ProcessResourceDetector().detect()) logger.info(f"OTEL Resource: {resource.__dict__}") # create a trace provider from the resource diff --git a/packages/syft/src/syft/util/trace_decorator.py b/packages/syft/src/syft/util/trace_decorator.py index 719ca822afa..1746fafc095 100644 --- a/packages/syft/src/syft/util/trace_decorator.py +++ b/packages/syft/src/syft/util/trace_decorator.py @@ -6,6 +6,7 @@ from collections.abc import Callable from functools import wraps import inspect +import threading from typing import Any from typing import ClassVar from typing import TypeVar @@ -116,10 +117,13 @@ def span_decorator(func_or_class: T) -> T: tracer = existing_tracer or trace.get_tracer(func_or_class.__module__) def _set_semantic_attributes(span: Span, func: Callable) -> None: + thread = threading.current_thread() span.set_attribute(SpanAttributes.CODE_NAMESPACE, func.__module__) span.set_attribute(SpanAttributes.CODE_FUNCTION, func.__qualname__) span.set_attribute(SpanAttributes.CODE_FILEPATH, func.__code__.co_filename) span.set_attribute(SpanAttributes.CODE_LINENO, func.__code__.co_firstlineno) + span.set_attribute(SpanAttributes.THREAD_ID, thread.ident) + span.set_attribute(SpanAttributes.THREAD_NAME, thread.name) def _set_attributes( span: Span, attributes_dict: dict[str, str] | None = None From 6ff4ca3628d006bebcbaca580c34f246f8590dac Mon Sep 17 00:00:00 2001 From: Yash Gorana Date: Mon, 16 Sep 2024 17:03:25 +0530 Subject: [PATCH 07/13] logging fix --- packages/grid/backend/grid/logging.yaml | 29 ++-- packages/grid/backend/grid/main.py | 147 +++++++----------- packages/grid/backend/grid/start.sh | 2 +- packages/syft/src/syft/server/uvicorn.py | 11 +- .../syft/src/syft/service/code/user_code.py | 2 +- .../syft/service/notifier/notifier_service.py | 10 +- .../syft/src/syft/service/queue/zmq_client.py | 7 +- .../syft/src/syft/service/worker/utils.py | 3 - .../src/syft/store/blob_storage/seaweedfs.py | 18 +-- packages/syft/src/syft/store/mongo_client.py | 16 +- packages/syft/src/syft/util/telemetry.py | 87 +++++++---- .../syft/src/syft/util/trace_decorator.py | 17 +- 12 files changed, 173 insertions(+), 176 deletions(-) diff --git a/packages/grid/backend/grid/logging.yaml b/packages/grid/backend/grid/logging.yaml index b41eb783038..ddf4cc4a82b 100644 --- a/packages/grid/backend/grid/logging.yaml +++ b/packages/grid/backend/grid/logging.yaml @@ -1,16 +1,16 @@ version: 1 disable_existing_loggers: True + formatters: default: - format: "%(asctime)s - %(levelname)s - %(name)s - %(message)s" - datefmt: "%Y-%m-%d %H:%M:%S" + format: "ROOT %(asctime)s - %(levelname)s - %(name)s - %(message)s" uvicorn.default: "()": uvicorn.logging.DefaultFormatter format: "%(asctime)s - %(levelname)s - %(name)s - %(message)s" uvicorn.access: "()": "uvicorn.logging.AccessFormatter" format: "%(asctime)s - %(levelname)s - %(name)s - %(message)s" - datefmt: "%Y-%m-%d %H:%M:%S" + handlers: default: formatter: default @@ -24,23 +24,32 @@ handlers: formatter: uvicorn.access class: logging.StreamHandler stream: ext://sys.stdout + loggers: + # uvicorn loggers uvicorn.error: level: INFO handlers: - uvicorn.default - propagate: no + propagate: false uvicorn.access: level: INFO handlers: - uvicorn.access - propagate: no + propagate: false + # syft & grid loggers syft: level: INFO handlers: - default - propagate: no -root: - level: INFO - handlers: - - default + propagate: false + grid: + level: INFO + handlers: + - default + propagate: false + # root logger + # do not set level, else pip packages will be affected + "": + handlers: + - default diff --git a/packages/grid/backend/grid/main.py b/packages/grid/backend/grid/main.py index 12af4165179..1e488644a30 100644 --- a/packages/grid/backend/grid/main.py +++ b/packages/grid/backend/grid/main.py @@ -6,63 +6,93 @@ # third party from fastapi import FastAPI from fastapi.responses import JSONResponse +from starlette.middleware import Middleware from starlette.middleware.cors import CORSMiddleware # syft absolute -from syft.protocol.data_protocol import stage_protocol_changes +from syft.util.telemetry import instrument_fastapi # server absolute from grid.api.router import api_router from grid.core.config import settings from grid.core.server import worker +# logger => grid.main +logger = logging.getLogger(__name__) + + +class FastAPILogFilter(logging.Filter): + HEALTHCHECK_ENDPOINT = f"{settings.API_V2_STR}/?probe=" -class EndpointFilter(logging.Filter): def filter(self, record: logging.LogRecord) -> bool: - return record.getMessage().find("/api/v2/?probe=livenessProbe") == -1 + return record.getMessage().find(self.HEALTHCHECK_ENDPOINT) == -1 + + +def on_app_startup(app: FastAPI) -> None: + if settings.DEV_MODE: + # syft absolute + from syft.protocol.data_protocol import stage_protocol_changes + + logger.info("Staging protocol changes...") + status = stage_protocol_changes() + logger.info(f"Staging protocol result: {status}") + +def on_app_shutdown(app: FastAPI) -> None: + worker.stop() + logger.info("Worker Stopped") -logger = logging.getLogger("uvicorn.error") -logging.getLogger("uvicorn.access").addFilter(EndpointFilter()) + +def get_middlewares() -> FastAPI: + middlewares = [] + + # Set all CORS enabled origins + if settings.BACKEND_CORS_ORIGINS: + middlewares.append( + Middleware( + CORSMiddleware, + allow_origins=[str(origin) for origin in settings.BACKEND_CORS_ORIGINS], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], + ) + ) + + return middlewares @asynccontextmanager async def lifespan(app: FastAPI) -> Any: try: + on_app_startup(app) yield finally: - worker.stop() - logger.info("Worker Stop") + on_app_shutdown(app) -app = FastAPI( - title=settings.PROJECT_NAME, - openapi_url=f"{settings.API_V2_STR}/openapi.json", - lifespan=lifespan, -) +def create_app() -> FastAPI: + app = FastAPI( + title=settings.PROJECT_NAME, + openapi_url=f"{settings.API_V2_STR}/openapi.json", + lifespan=lifespan, + middleware=get_middlewares(), + ) + # instrument app + instrument_fastapi(app) -# Set all CORS enabled origins -if settings.BACKEND_CORS_ORIGINS: - app.add_middleware( - CORSMiddleware, - allow_origins=[str(origin) for origin in settings.BACKEND_CORS_ORIGINS], - allow_credentials=True, - allow_methods=["*"], - allow_headers=["*"], - ) + # patch logger to ignore healthcheck logs + logging.getLogger("uvicorn.access").addFilter(FastAPILogFilter()) + + # add Syft API routes + app.include_router(api_router, prefix=settings.API_V2_STR) -app.include_router(api_router, prefix=settings.API_V2_STR) -logger.info("Included routes, app should now be reachable") + return app -if settings.DEV_MODE: - logger.info("Staging protocol changes...") - status = stage_protocol_changes() - logger.info(f"Staging protocol result: {status}") +app = create_app() -# needed for Google Kubernetes Engine LoadBalancer Healthcheck @app.get( "/", name="healthcheck", @@ -70,65 +100,4 @@ async def lifespan(app: FastAPI) -> Any: response_class=JSONResponse, ) def healthcheck() -> dict[str, str]: - """ - Currently, all service backends must satisfy either of the following requirements to - pass the HTTP health checks sent to it from the GCE loadbalancer: 1. Respond with a - 200 on '/'. The content does not matter. 2. Expose an arbitrary url as a readiness - probe on the pods backing the Service. - """ return {"status": "ok"} - - -if settings.TRACING_ENABLED: - try: - # stdlib - import os - - endpoint = os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT", None) - # third party - from opentelemetry._logs import set_logger_provider - from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter - from opentelemetry.sdk._logs import LoggerProvider - from opentelemetry.sdk._logs import LoggingHandler - from opentelemetry.sdk._logs.export import BatchLogRecordProcessor - from opentelemetry.sdk.resources import Resource - - logger_provider = LoggerProvider( - resource=Resource.create( - { - "service.name": "backend-container", - } - ), - ) - set_logger_provider(logger_provider) - - exporter = OTLPLogExporter(insecure=True, endpoint=endpoint) - - logger_provider.add_log_record_processor(BatchLogRecordProcessor(exporter)) - handler = LoggingHandler(level=logging.NOTSET, logger_provider=logger_provider) - - # Attach OTLP handler to root logger - logging.getLogger().addHandler(handler) - logger = logging.getLogger(__name__) - message = "> Added OTEL BatchLogRecordProcessor" - print(message) - logger.info(message) - - except Exception as e: - print(f"Failed to load OTLPLogExporter. {e}") - - # third party - try: - # third party - from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor - - FastAPIInstrumentor.instrument_app(app) - message = "> Added OTEL FastAPIInstrumentor" - print(message) - logger = logging.getLogger(__name__) - logger.info(message) - except Exception as e: - error = f"Failed to load FastAPIInstrumentor. {e}" - print(error) - logger = logging.getLogger(__name__) - logger.error(message) diff --git a/packages/grid/backend/grid/start.sh b/packages/grid/backend/grid/start.sh index 79ca493d4c2..2831efa5b6f 100755 --- a/packages/grid/backend/grid/start.sh +++ b/packages/grid/backend/grid/start.sh @@ -48,7 +48,7 @@ then export OTEL_SERVICE_NAME="${DEPLOYMENT_ENV}-${OTEL_SERVICE_NAME:-"backend"}" export OTEL_RESOURCE_ATTRIBUTES=$(IFS=, ; echo "${RESOURCE_ATTRS[*]}") - echo "OpenTelemetry Enabled." + echo "OpenTelemetry Enabled" env | grep OTEL_ else echo "OpenTelemetry Disabled" diff --git a/packages/syft/src/syft/server/uvicorn.py b/packages/syft/src/syft/server/uvicorn.py index 80f15a6d5ba..7b975362c8e 100644 --- a/packages/syft/src/syft/server/uvicorn.py +++ b/packages/syft/src/syft/server/uvicorn.py @@ -27,7 +27,7 @@ from ..deployment_type import DeploymentType from ..util.autoreload import enable_autoreload from ..util.constants import DEFAULT_TIMEOUT -from ..util.telemetry import TRACING_ENABLED +from ..util.telemetry import instrument_fastapi from ..util.util import os_name from .datasite import Datasite from .enclave import Enclave @@ -114,14 +114,7 @@ def app_factory() -> FastAPI: allow_methods=["*"], allow_headers=["*"], ) - - if TRACING_ENABLED: - # third party - from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor - - FastAPIInstrumentor().instrument_app(app) - print("> Added OTEL FastAPIInstrumentor") - + instrument_fastapi(app) return app diff --git a/packages/syft/src/syft/service/code/user_code.py b/packages/syft/src/syft/service/code/user_code.py index 0921ea9e704..ea0e5c7dde8 100644 --- a/packages/syft/src/syft/service/code/user_code.py +++ b/packages/syft/src/syft/service/code/user_code.py @@ -101,7 +101,7 @@ from .utils import parse_code from .utils import submit_subjobs_code -logger = logging.getLogger(name=__name__) +logger = logging.getLogger(__name__) if TYPE_CHECKING: # relative diff --git a/packages/syft/src/syft/service/notifier/notifier_service.py b/packages/syft/src/syft/service/notifier/notifier_service.py index d8c1da2530f..4c86fd7f5e9 100644 --- a/packages/syft/src/syft/service/notifier/notifier_service.py +++ b/packages/syft/src/syft/service/notifier/notifier_service.py @@ -131,14 +131,14 @@ def turn_on( public_message="You must provide both server and port to enable notifications." ) - logging.debug("Got notifier from db") + logger.debug("Got notifier from db") skip_auth: bool = False # If no new credentials provided, check for existing ones if not (email_username and email_password): if not (notifier.email_username and notifier.email_password): skip_auth = True else: - logging.debug("No new credentials provided. Using existing ones.") + logger.debug("No new credentials provided. Using existing ones.") email_password = notifier.email_password email_username = notifier.email_username @@ -152,7 +152,7 @@ def turn_on( ) if not valid_credentials: - logging.error("Invalid SMTP credentials.") + logger.error("Invalid SMTP credentials.") raise SyftException(public_message=("Invalid SMTP credentials.")) notifier.email_password = email_password @@ -183,7 +183,7 @@ def turn_on( notifier.email_sender = email_sender notifier.active = True - logging.debug( + logger.debug( "Email credentials are valid. Updating the notifier settings in the db." ) @@ -345,7 +345,7 @@ def dispatch_notification( # If notifier is active if notifier.active and notification.email_template is not None: - logging.debug("Checking user email activity") + logger.debug("Checking user email activity") if notifier.email_activity.get(notification.email_template.__name__, None): user_activity = notifier.email_activity[ diff --git a/packages/syft/src/syft/service/queue/zmq_client.py b/packages/syft/src/syft/service/queue/zmq_client.py index 9265d9edd3d..1c68ded7537 100644 --- a/packages/syft/src/syft/service/queue/zmq_client.py +++ b/packages/syft/src/syft/service/queue/zmq_client.py @@ -1,5 +1,6 @@ # stdlib from collections import defaultdict +import logging import socketserver # relative @@ -21,6 +22,8 @@ from .zmq_consumer import ZMQConsumer from .zmq_producer import ZMQProducer +logger = logging.getLogger(__name__) + @serializable() class ZMQClientConfig(SyftObject, QueueClientConfig): @@ -77,7 +80,9 @@ def add_producer( else: port = self.config.queue_port - print(f"Adding producer for queue: {queue_name} on: {get_queue_address(port)}") + logger.info( + f"Adding producer for queue: {queue_name} on: {get_queue_address(port)}" + ) producer = ZMQProducer( queue_name=queue_name, queue_stash=queue_stash, diff --git a/packages/syft/src/syft/service/worker/utils.py b/packages/syft/src/syft/service/worker/utils.py index 64171c5f90b..01d440879bd 100644 --- a/packages/syft/src/syft/service/worker/utils.py +++ b/packages/syft/src/syft/service/worker/utils.py @@ -353,9 +353,6 @@ def create_kubernetes_pool( "CREATE_PRODUCER": "False", "INMEMORY_WORKERS": "False", "OTEL_SERVICE_NAME": f"{pool_name}", - "OTEL_PYTHON_LOG_CORRELATION": os.environ.get( - "OTEL_PYTHON_LOG_CORRELATION" - ), "OTEL_EXPORTER_OTLP_ENDPOINT": os.environ.get( "OTEL_EXPORTER_OTLP_ENDPOINT" ), diff --git a/packages/syft/src/syft/store/blob_storage/seaweedfs.py b/packages/syft/src/syft/store/blob_storage/seaweedfs.py index 3409035906f..99ae35e6cea 100644 --- a/packages/syft/src/syft/store/blob_storage/seaweedfs.py +++ b/packages/syft/src/syft/store/blob_storage/seaweedfs.py @@ -42,26 +42,16 @@ from ...types.syft_object import SYFT_OBJECT_VERSION_1 from ...types.uid import UID from ...util.constants import DEFAULT_TIMEOUT -from ...util.telemetry import TRACING_ENABLED - -logger = logging.getLogger(__name__) +from ...util.telemetry import instrument_botocore MAX_QUEUE_SIZE = 100 WRITE_EXPIRATION_TIME = 900 # seconds DEFAULT_FILE_PART_SIZE = 1024**3 # 1GB DEFAULT_UPLOAD_CHUNK_SIZE = 1024 * 800 # 800KB -if TRACING_ENABLED: - try: - # third party - from opentelemetry.instrumentation.botocore import BotocoreInstrumentor - - BotocoreInstrumentor().instrument() - message = "> Added OTEL BotocoreInstrumentor" - print(message) - logger.info(message) - except Exception: # nosec - pass +logger = logging.getLogger(__name__) + +instrument_botocore() @serializable() diff --git a/packages/syft/src/syft/store/mongo_client.py b/packages/syft/src/syft/store/mongo_client.py index 9767059a1cb..209325087a9 100644 --- a/packages/syft/src/syft/store/mongo_client.py +++ b/packages/syft/src/syft/store/mongo_client.py @@ -1,5 +1,4 @@ # stdlib -import logging from threading import Lock from typing import Any @@ -13,24 +12,13 @@ from ..serde.serializable import serializable from ..types.errors import SyftException from ..types.result import as_result -from ..util.telemetry import TRACING_ENABLED +from ..util.telemetry import instrument_mongo from .document_store import PartitionSettings from .document_store import StoreClientConfig from .document_store import StoreConfig from .mongo_codecs import SYFT_CODEC_OPTIONS -if TRACING_ENABLED: - try: - # third party - from opentelemetry.instrumentation.pymongo import PymongoInstrumentor - - PymongoInstrumentor().instrument() - message = "> Added OTEL PymongoInstrumentor" - print(message) - logger = logging.getLogger(__name__) - logger.info(message) - except Exception: # nosec - pass +instrument_mongo() @serializable(canonical_name="MongoStoreClientConfig", version=1) diff --git a/packages/syft/src/syft/util/telemetry.py b/packages/syft/src/syft/util/telemetry.py index b609a0f023b..00fe0bcf2a9 100644 --- a/packages/syft/src/syft/util/telemetry.py +++ b/packages/syft/src/syft/util/telemetry.py @@ -1,36 +1,29 @@ # stdlib -from collections.abc import Callable import logging import os from typing import Any -from typing import TypeVar # relative +from . import trace_decorator from .. import __version__ from .util import str_to_bool -__all__ = ["TRACING_ENABLED", "instrument"] - -logger = logging.getLogger(__name__) +__all__ = [ + "TRACING_ENABLED", + "instrument", + "instrument_fastapi", + "instrument_mongo", + "instrument_botocore", +] TRACING_ENABLED = str_to_bool(os.environ.get("TRACING", "False")) +logger = logging.getLogger(__name__) -T = TypeVar("T", bound=Callable | type) - - -def noop(__func_or_class: T | None = None, /, *args: Any, **kwargs: Any) -> T: - def noop_wrapper(__func_or_class: T) -> T: - return __func_or_class - - if __func_or_class is None: - return noop_wrapper # type: ignore - else: - return __func_or_class +def setup_instrumenter() -> Any: + if not TRACING_ENABLED: + return trace_decorator.no_instrument -if not TRACING_ENABLED: - instrument = noop -else: try: # third party from opentelemetry import trace @@ -43,14 +36,11 @@ def noop_wrapper(__func_or_class: T) -> T: from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor - # relative - from .trace_decorator import instrument as _instrument - # create a resource resource = Resource({"syft.version": __version__}) resource = resource.merge(OTELResourceDetector().detect()) resource = resource.merge(ProcessResourceDetector().detect()) - logger.info(f"OTEL Resource: {resource.__dict__}") + logger.debug(f"OTEL resource : {resource.__dict__}") # create a trace provider from the resource provider = TracerProvider(resource=resource) @@ -63,8 +53,53 @@ def noop_wrapper(__func_or_class: T) -> T: # set the global trace provider trace.set_tracer_provider(provider) - # expose the instrument decorator - instrument = _instrument + logger.info("Added TracerProvider with BatchSpanProcessor") + return trace_decorator.instrument except Exception as e: logger.error("Failed to import opentelemetry", exc_info=e) - instrument = noop + return trace_decorator.no_instrument + + +def instrument_fastapi(app: Any) -> None: + if not TRACING_ENABLED: + return + + try: + # third party + from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor + + FastAPIInstrumentor().instrument_app(app) + logger.info("Added OTEL FastAPIInstrumentor") + except Exception as e: + logger.error(f"Failed to load FastAPIInstrumentor. {e}") + + +def instrument_mongo() -> None: + if not TRACING_ENABLED: + return + + try: + # third party + from opentelemetry.instrumentation.pymongo import PymongoInstrumentor + + PymongoInstrumentor().instrument() + logger.info("Added OTEL PymongoInstrumentor") + except Exception as e: + logger.error(f"Failed to load PymongoInstrumentor. {e}") + + +def instrument_botocore() -> None: + if not TRACING_ENABLED: + return + + try: + # third party + from opentelemetry.instrumentation.botocore import BotocoreInstrumentor + + BotocoreInstrumentor().instrument() + logger.info("Added OTEL BotocoreInstrumentor") + except Exception as e: + logger.error(f"Failed to load BotocoreInstrumentor. {e}") + + +instrument = setup_instrumenter() diff --git a/packages/syft/src/syft/util/trace_decorator.py b/packages/syft/src/syft/util/trace_decorator.py index 1746fafc095..403ad188d6b 100644 --- a/packages/syft/src/syft/util/trace_decorator.py +++ b/packages/syft/src/syft/util/trace_decorator.py @@ -17,6 +17,10 @@ from opentelemetry.trace import Tracer from opentelemetry.trace.span import Span +__all__ = ["instrument", "no_instrument", "T"] + +T = TypeVar("T", bound=Callable | type) + class TracingDecoratorOptions: class NamingSchemes: @@ -40,9 +44,6 @@ def set_default_attributes(cls, attributes: dict[str, str] | None = None) -> Non cls.default_attributes[att] = attributes[att] -T = TypeVar("T", bound=Callable | type) - - def instrument( _func_or_class: T | None = None, /, @@ -171,3 +172,13 @@ async def wrap_with_span_async(*args: Any, **kwargs: Any) -> Callable: return span_decorator(_func_or_class) else: return span_decorator # type: ignore + + +def no_instrument(__func_or_class: T | None = None, /, *args: Any, **kwargs: Any) -> T: + def noop_wrapper(__func_or_class: T) -> T: + return __func_or_class + + if __func_or_class is None: + return noop_wrapper # type: ignore + else: + return __func_or_class From 5c0d49cc5edb720d6bd5c601fe29520ff6bf8dea Mon Sep 17 00:00:00 2001 From: rasswanth-s <43314053+rasswanth-s@users.noreply.github.com> Date: Thu, 19 Sep 2024 18:07:04 +0530 Subject: [PATCH 08/13] rename remaining mongo references to postgres --- .../bigquery/010-setup-bigquery-pool.ipynb | 2 +- packages/grid/helm/syft/values.yaml | 2 +- packages/syft/src/syft/store/mongo_client.py | 263 ------------------ packages/syft/src/syft/util/telemetry.py | 14 - packages/syft/tests/conftest.py | 1 - tox.ini | 2 +- 6 files changed, 3 insertions(+), 281 deletions(-) delete mode 100644 packages/syft/src/syft/store/mongo_client.py diff --git a/notebooks/scenarios/bigquery/010-setup-bigquery-pool.ipynb b/notebooks/scenarios/bigquery/010-setup-bigquery-pool.ipynb index d72a82f9eb1..e6e6dddcc62 100644 --- a/notebooks/scenarios/bigquery/010-setup-bigquery-pool.ipynb +++ b/notebooks/scenarios/bigquery/010-setup-bigquery-pool.ipynb @@ -350,7 +350,7 @@ "metadata": {}, "outputs": [], "source": [ - "# overwrite it for now Mongo ignore\n", + "# overwrite it for now Postgres ignore\n", "result = high_client.api.services.worker_image.submit(worker_config=docker_config)\n", "result" ] diff --git a/packages/grid/helm/syft/values.yaml b/packages/grid/helm/syft/values.yaml index e78910db98b..b6728cefa2a 100644 --- a/packages/grid/helm/syft/values.yaml +++ b/packages/grid/helm/syft/values.yaml @@ -36,7 +36,7 @@ postgres: # PVC storage size storageSize: 5Gi - # Mongo secret name. Override this if you want to use a self-managed secret. + # Postgres secret name. Override this if you want to use a self-managed secret. secretKeyName: postgres-secret # default/custom secret raw values diff --git a/packages/syft/src/syft/store/mongo_client.py b/packages/syft/src/syft/store/mongo_client.py deleted file mode 100644 index 209325087a9..00000000000 --- a/packages/syft/src/syft/store/mongo_client.py +++ /dev/null @@ -1,263 +0,0 @@ -# stdlib -from threading import Lock -from typing import Any - -# third party -from pymongo.collection import Collection as MongoCollection -from pymongo.database import Database as MongoDatabase -from pymongo.errors import ConnectionFailure -from pymongo.mongo_client import MongoClient as PyMongoClient - -# relative -from ..serde.serializable import serializable -from ..types.errors import SyftException -from ..types.result import as_result -from ..util.telemetry import instrument_mongo -from .document_store import PartitionSettings -from .document_store import StoreClientConfig -from .document_store import StoreConfig -from .mongo_codecs import SYFT_CODEC_OPTIONS - -instrument_mongo() - - -@serializable(canonical_name="MongoStoreClientConfig", version=1) -class MongoStoreClientConfig(StoreClientConfig): - """ - Paramaters: - `hostname`: optional string - hostname or IP address or Unix domain socket path of a single mongod or mongos - instance to connect to, or a mongodb URI, or a list of hostnames (but no more - than one mongodb URI). If `host` is an IPv6 literal it must be enclosed in '[' - and ']' characters following the RFC2732 URL syntax (e.g. '[::1]' for localhost). - Multihomed and round robin DNS addresses are **not** supported. - `port` : optional int - port number on which to connect - `directConnection`: bool - if ``True``, forces this client to connect directly to the specified MongoDB host - as a standalone. If ``false``, the client connects to the entire replica set of which - the given MongoDB host(s) is a part. If this is ``True`` and a mongodb+srv:// URI - or a URI containing multiple seeds is provided, an exception will be raised. - `maxPoolSize`: int. Default 100 - The maximum allowable number of concurrent connections to each connected server. - Requests to a server will block if there are `maxPoolSize` outstanding connections - to the requested server. Defaults to 100. Can be either 0 or None, in which case - there is no limit on the number of concurrent connections. - `minPoolSize` : int. Default 0 - The minimum required number of concurrent connections that the pool will maintain - to each connected server. Default is 0. - `maxIdleTimeMS`: int - The maximum number of milliseconds that a connection can remain idle in the pool - before being removed and replaced. Defaults to `None` (no limit). - `appname`: string - The name of the application that created this MongoClient instance. The server will - log this value upon establishing each connection. It is also recorded in the slow - query log and profile collections. - `maxConnecting`: optional int - The maximum number of connections that each pool can establish concurrently. - Defaults to `2`. - `timeoutMS`: (integer or None) - Controls how long (in milliseconds) the driver will wait when executing an operation - (including retry attempts) before raising a timeout error. ``0`` or ``None`` means - no timeout. - `socketTimeoutMS`: (integer or None) - Controls how long (in milliseconds) the driver will wait for a response after sending - an ordinary (non-monitoring) database operation before concluding that a network error - has occurred. ``0`` or ``None`` means no timeout. Defaults to ``None`` (no timeout). - `connectTimeoutMS`: (integer or None) - Controls how long (in milliseconds) the driver will wait during server monitoring when - connecting a new socket to a server before concluding the server is unavailable. - ``0`` or ``None`` means no timeout. Defaults to ``20000`` (20 seconds). - `serverSelectionTimeoutMS`: (integer) - Controls how long (in milliseconds) the driver will wait to find an available, appropriate - server to carry out a database operation; while it is waiting, multiple server monitoring - operations may be carried out, each controlled by `connectTimeoutMS`. - Defaults to ``120000`` (120 seconds). - `waitQueueTimeoutMS`: (integer or None) - How long (in milliseconds) a thread will wait for a socket from the pool if the pool - has no free sockets. Defaults to ``None`` (no timeout). - `heartbeatFrequencyMS`: (optional) - The number of milliseconds between periodic server checks, or None to accept the default - frequency of 10 seconds. - # Auth - username: str - Database username - password: str - Database pass - authSource: str - The database to authenticate on. - Defaults to the database specified in the URI, if provided, or to “admin”. - tls: bool - If True, create the connection to the server using transport layer security. - Defaults to False. - # Testing and connection reuse - client: Optional[PyMongoClient] - If provided, this client is reused. Default = None - - """ - - # Connection - hostname: str | None = "127.0.0.1" - port: int | None = None - directConnection: bool = False - maxPoolSize: int = 200 - minPoolSize: int = 0 - maxIdleTimeMS: int | None = None - maxConnecting: int = 3 - timeoutMS: int = 0 - socketTimeoutMS: int = 0 - connectTimeoutMS: int = 20000 - serverSelectionTimeoutMS: int = 120000 - waitQueueTimeoutMS: int | None = None - heartbeatFrequencyMS: int = 10000 - appname: str = "pysyft" - # Auth - username: str | None = None - password: str | None = None - authSource: str = "admin" - tls: bool | None = False - # Testing and connection reuse - client: Any = None - - # this allows us to have one connection per `Server` object - # in the MongoClientCache - server_obj_python_id: int | None = None - - -class MongoClientCache: - __client_cache__: dict[int, type["MongoClient"] | None] = {} - _lock: Lock = Lock() - - @classmethod - def from_cache(cls, config: MongoStoreClientConfig) -> PyMongoClient | None: - return cls.__client_cache__.get(hash(str(config)), None) - - @classmethod - def set_cache(cls, config: MongoStoreClientConfig, client: PyMongoClient) -> None: - with cls._lock: - cls.__client_cache__[hash(str(config))] = client - - -class MongoClient: - client: PyMongoClient = None - - def __init__(self, config: MongoStoreClientConfig, cache: bool = True) -> None: - self.config = config - if config.client is not None: - self.client = config.client - elif cache: - self.client = MongoClientCache.from_cache(config=config) - - if not cache or self.client is None: - self.connect(config=config).unwrap() - - @as_result(SyftException) - def connect(self, config: MongoStoreClientConfig) -> bool: - self.client = PyMongoClient( - # Connection - host=config.hostname, - port=config.port, - directConnection=config.directConnection, - maxPoolSize=config.maxPoolSize, - minPoolSize=config.minPoolSize, - maxIdleTimeMS=config.maxIdleTimeMS, - maxConnecting=config.maxConnecting, - timeoutMS=config.timeoutMS, - socketTimeoutMS=config.socketTimeoutMS, - connectTimeoutMS=config.connectTimeoutMS, - serverSelectionTimeoutMS=config.serverSelectionTimeoutMS, - waitQueueTimeoutMS=config.waitQueueTimeoutMS, - heartbeatFrequencyMS=config.heartbeatFrequencyMS, - appname=config.appname, - # Auth - username=config.username, - password=config.password, - authSource=config.authSource, - tls=config.tls, - uuidRepresentation="standard", - ) - MongoClientCache.set_cache(config=config, client=self.client) - try: - # Check if mongo connection is still up - self.client.admin.command("ping") - except ConnectionFailure as e: - self.client = None - raise SyftException.from_exception(e) - - return True - - @as_result(SyftException) - def with_db(self, db_name: str) -> MongoDatabase: - try: - return self.client[db_name] - except BaseException as e: - raise SyftException.from_exception(e) - - @as_result(SyftException) - def with_collection( - self, - collection_settings: PartitionSettings, - store_config: StoreConfig, - collection_name: str | None = None, - ) -> MongoCollection: - db = self.with_db(db_name=store_config.db_name).unwrap() - - try: - collection_name = ( - collection_name - if collection_name is not None - else collection_settings.name - ) - collection = db.get_collection( - name=collection_name, codec_options=SYFT_CODEC_OPTIONS - ) - except BaseException as e: - raise SyftException.from_exception(e) - - return collection - - @as_result(SyftException) - def with_collection_permissions( - self, collection_settings: PartitionSettings, store_config: StoreConfig - ) -> MongoCollection: - """ - For each collection, create a corresponding collection - that store the permissions to the data in that collection - """ - db = self.with_db(db_name=store_config.db_name).unwrap() - - try: - collection_permissions_name: str = collection_settings.name + "_permissions" - collection_permissions = db.get_collection( - name=collection_permissions_name, codec_options=SYFT_CODEC_OPTIONS - ) - except BaseException as e: - raise SyftException.from_exception(e) - return collection_permissions - - @as_result(SyftException) - def with_collection_storage_permissions( - self, collection_settings: PartitionSettings, store_config: StoreConfig - ) -> MongoCollection: - """ - For each collection, create a corresponding collection - that store the permissions to the data in that collection - """ - db = self.with_db(db_name=store_config.db_name).unwrap() - - try: - collection_storage_permissions_name: str = ( - collection_settings.name + "_storage_permissions" - ) - storage_permissons_collection = db.get_collection( - name=collection_storage_permissions_name, - codec_options=SYFT_CODEC_OPTIONS, - ) - except BaseException as e: - raise SyftException.from_exception(e) - - return storage_permissons_collection - - def close(self) -> None: - self.client.close() - MongoClientCache.__client_cache__.pop(hash(str(self.config)), None) diff --git a/packages/syft/src/syft/util/telemetry.py b/packages/syft/src/syft/util/telemetry.py index 00fe0bcf2a9..2acc09663a1 100644 --- a/packages/syft/src/syft/util/telemetry.py +++ b/packages/syft/src/syft/util/telemetry.py @@ -12,7 +12,6 @@ "TRACING_ENABLED", "instrument", "instrument_fastapi", - "instrument_mongo", "instrument_botocore", ] @@ -74,19 +73,6 @@ def instrument_fastapi(app: Any) -> None: logger.error(f"Failed to load FastAPIInstrumentor. {e}") -def instrument_mongo() -> None: - if not TRACING_ENABLED: - return - - try: - # third party - from opentelemetry.instrumentation.pymongo import PymongoInstrumentor - - PymongoInstrumentor().instrument() - logger.info("Added OTEL PymongoInstrumentor") - except Exception as e: - logger.error(f"Failed to load PymongoInstrumentor. {e}") - def instrument_botocore() -> None: if not TRACING_ENABLED: diff --git a/packages/syft/tests/conftest.py b/packages/syft/tests/conftest.py index 56506b43fad..e9b226a6e13 100644 --- a/packages/syft/tests/conftest.py +++ b/packages/syft/tests/conftest.py @@ -26,7 +26,6 @@ from syft.service.queue.queue_stash import QueueStash from syft.service.user import user -# our version of mongomock that has a fix for CodecOptions and custom TypeRegistry Support def patch_protocol_file(filepath: Path): diff --git a/tox.ini b/tox.ini index 4f405694dc0..95bf457df90 100644 --- a/tox.ini +++ b/tox.ini @@ -908,7 +908,7 @@ commands = (kubectl logs service/backend --context k3d-$CLUSTER_NAME --namespace syft -f &) | grep -q 'Application startup complete' || true; \ fi" - # Mongo + # Postgres bash -c "if echo '{posargs}' | grep -q 'postgres'; then echo 'Checking readiness of Postgres'; ./scripts/wait_for.sh service postgres --context k3d-$CLUSTER_NAME --namespace syft; fi" # Proxy From c748a6d453ebaadc302ec2dea7126e855752aa78 Mon Sep 17 00:00:00 2001 From: Yash Gorana Date: Fri, 20 Sep 2024 02:12:03 +0530 Subject: [PATCH 09/13] add thread + postgres instrumentation --- packages/grid/backend/grid/logging.yaml | 2 +- packages/syft/setup.cfg | 2 +- packages/syft/src/syft/__init__.py | 3 +++ packages/syft/src/syft/store/db/db.py | 2 ++ packages/syft/src/syft/util/telemetry.py | 29 +++++++++++++++++++++++- packages/syft/tests/conftest.py | 1 - 6 files changed, 35 insertions(+), 4 deletions(-) diff --git a/packages/grid/backend/grid/logging.yaml b/packages/grid/backend/grid/logging.yaml index ddf4cc4a82b..9d5d3954f55 100644 --- a/packages/grid/backend/grid/logging.yaml +++ b/packages/grid/backend/grid/logging.yaml @@ -3,7 +3,7 @@ disable_existing_loggers: True formatters: default: - format: "ROOT %(asctime)s - %(levelname)s - %(name)s - %(message)s" + format: "%(asctime)s - %(levelname)s - %(name)s - %(message)s" uvicorn.default: "()": uvicorn.logging.DefaultFormatter format: "%(asctime)s - %(levelname)s - %(name)s - %(message)s" diff --git a/packages/syft/setup.cfg b/packages/syft/setup.cfg index d4ee2cff521..7d1a8b2a616 100644 --- a/packages/syft/setup.cfg +++ b/packages/syft/setup.cfg @@ -117,9 +117,9 @@ telemetry = opentelemetry-instrumentation-botocore==0.48b0 opentelemetry-instrumentation-logging==0.48b0 opentelemetry-instrumentation-sqlalchemy==0.48b0 + opentelemetry-instrumentation-threading==0.48b0 ; opentelemetry-instrumentation-asyncio==0.48b0 ; opentelemetry-instrumentation-sqlite3==0.48b0 - ; opentelemetry-instrumentation-threading==0.48b0 ; opentelemetry-instrumentation-jinja2==0.48b0 ; opentelemetry-instrumentation-system-metrics==0.48b0 diff --git a/packages/syft/src/syft/__init__.py b/packages/syft/src/syft/__init__.py index 2534f22077e..465e7a69fef 100644 --- a/packages/syft/src/syft/__init__.py +++ b/packages/syft/src/syft/__init__.py @@ -90,6 +90,7 @@ from .util.commit import __commit__ from .util.patch_ipython import patch_ipython from .util.telemetry import instrument +from .util.telemetry import instrument_threads from .util.util import autocache from .util.util import get_root_data_path from .util.version_compare import make_requires @@ -103,6 +104,8 @@ sys.path.append(str(Path(__file__))) +instrument_threads() + patch_ipython() diff --git a/packages/syft/src/syft/store/db/db.py b/packages/syft/src/syft/store/db/db.py index cc82e5a3f4e..08d85b2fa53 100644 --- a/packages/syft/src/syft/store/db/db.py +++ b/packages/syft/src/syft/store/db/db.py @@ -13,10 +13,12 @@ from ...serde.serializable import serializable from ...server.credentials import SyftVerifyKey from ...types.uid import UID +from ...util.telemetry import instrument_sqlalchemny from .schema import PostgresBase from .schema import SQLiteBase logger = logging.getLogger(__name__) +instrument_sqlalchemny() @serializable(canonical_name="DBConfig", version=1) diff --git a/packages/syft/src/syft/util/telemetry.py b/packages/syft/src/syft/util/telemetry.py index 2acc09663a1..d14b1235e2d 100644 --- a/packages/syft/src/syft/util/telemetry.py +++ b/packages/syft/src/syft/util/telemetry.py @@ -73,7 +73,6 @@ def instrument_fastapi(app: Any) -> None: logger.error(f"Failed to load FastAPIInstrumentor. {e}") - def instrument_botocore() -> None: if not TRACING_ENABLED: return @@ -88,4 +87,32 @@ def instrument_botocore() -> None: logger.error(f"Failed to load BotocoreInstrumentor. {e}") +def instrument_threads() -> None: + if not TRACING_ENABLED: + return + + try: + # third party + from opentelemetry.instrumentation.threading import ThreadingInstrumentor + + ThreadingInstrumentor().instrument() + logger.info("Added OTEL ThreadingInstrumentor") + except Exception as e: + logger.error(f"Failed to load ThreadingInstrumentor. {e}") + + +def instrument_sqlalchemny() -> None: + if not TRACING_ENABLED: + return + + try: + # third party + from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor + + SQLAlchemyInstrumentor().instrument(enable_commenter=True, commenter_options={}) + logger.info("Added OTEL SQLAlchemyInstrumentor") + except Exception as e: + logger.error(f"Failed to load SQLAlchemyInstrumentor. {e}") + + instrument = setup_instrumenter() diff --git a/packages/syft/tests/conftest.py b/packages/syft/tests/conftest.py index e9b226a6e13..a3908f59eae 100644 --- a/packages/syft/tests/conftest.py +++ b/packages/syft/tests/conftest.py @@ -27,7 +27,6 @@ from syft.service.user import user - def patch_protocol_file(filepath: Path): dp = get_data_protocol() shutil.copyfile(src=dp.file_path, dst=filepath) From 2997ae06d4218177d1e35838415cf8452ca7ac81 Mon Sep 17 00:00:00 2001 From: rasswanth-s <43314053+rasswanth-s@users.noreply.github.com> Date: Mon, 23 Sep 2024 15:30:01 +0530 Subject: [PATCH 10/13] lint --- packages/syft/src/syft/store/db/stash.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/syft/src/syft/store/db/stash.py b/packages/syft/src/syft/store/db/stash.py index 85860821866..be5bae5327b 100644 --- a/packages/syft/src/syft/store/db/stash.py +++ b/packages/syft/src/syft/store/db/stash.py @@ -102,7 +102,7 @@ def __init__(self, store: DBManager) -> None: self.db = store self.object_type = self.get_object_type() self.table = create_table(self.object_type, self.dialect) - self.sessionmaker = self.db.sessionmaker + self.sessionmaker: Callable[[], Session] = self.db.sessionmaker @property def dialect(self) -> sa.engine.interfaces.Dialect: From ff6e9f270810de32039ccdf326def9b9e420a08b Mon Sep 17 00:00:00 2001 From: Yash Gorana Date: Tue, 24 Sep 2024 23:04:16 +0530 Subject: [PATCH 11/13] drop signoz cluster from tox --- tox.ini | 51 +-------------------------------------------------- 1 file changed, 1 insertion(+), 50 deletions(-) diff --git a/tox.ini b/tox.ini index 982a3ecc423..73b71ff1c56 100644 --- a/tox.ini +++ b/tox.ini @@ -1037,39 +1037,6 @@ commands = ; restarts coredns bash -c 'kubectl delete pod -n kube-system -l k8s-app=kube-dns --context k3d-${CLUSTER_NAME}' -[testenv:dev.k8s.add.signoz] -description = Install Signoz on Kubernetes cluster -changedir = {toxinidir} -passenv=HOME,USER -setenv = - CLUSTER_NAME = {env:CLUSTER_NAME:syft-dev} -allowlist_externals = - bash - tox - helm - curl -commands = - helm install signoz signoz \ - --repo https://charts.signoz.io \ - --namespace platform \ - --create-namespace \ - --version 0.52.0 \ - --set frontend.service.type=LoadBalancer \ - --set otelCollector.service.type=LoadBalancer \ - --set otelCollectorMetrics.service.type=LoadBalancer - - ; wait for signoz frontend - bash -c 'echo Waiting for signoz-frontend; \ - WAIT_TIME=5 source packages/grid/scripts/wait_for.sh service signoz-frontend --namespace platform --context k3d-signoz > /dev/null' - - ; setup dafault account - curl --retry 5 --retry-all-errors -X POST \ - -H "Content-Type: application/json" \ - --data '{"email":"admin@localhost","name":"admin","orgName":"openmined","password":"password"}' \ - http://localhost:3301/api/v1/register - - bash -c "printf 'Signoz is running on http://localhost:3301\nEmail: \033[1;36madmin@localhost\033[0m\nPassword: \033[1;36mpassword\033[0m\n'" - [testenv:dev.k8s.add.collector] description = Install signoz/k8s-infra on Kubernetes cluster @@ -1084,7 +1051,7 @@ commands = helm install k8s-infra k8s-infra \ --repo https://charts.signoz.io \ --kube-context k3d-{env:CLUSTER_NAME} \ - --set global.deploymentEnvironment=local-dev \ + --set global.deploymentEnvironment=local \ --set clusterName={env:CLUSTER_NAME} \ --set otelCollectorEndpoint=http://{env:SIGNOZ_HOST}:4317 \ --set otelInsecure=true \ @@ -1122,22 +1089,6 @@ commands = ; dump cluster info tox -e dev.k8s.info -[testenv:dev.k8s.start.signoz] -description = Start local Kubernetes registry with Signoz installed -changedir = {toxinidir} -passenv=HOME,USER -allowlist_externals = - bash - tox -commands = - bash -c 'k3d cluster create signoz \ - -p "3301:3301@loadbalancer" \ - -p "4317:4317@loadbalancer" \ - --k3s-arg "--disable=metrics-server@server:*"' - - ; add signoz - tox -e dev.k8s.add.signoz - [testenv:dev.k8s.deploy] description = Deploy Syft to a local Kubernetes cluster with Devspace changedir = {toxinidir}/packages/grid From 4f060e5b3fdbeb13a316f68b3023394ba2abad41 Mon Sep 17 00:00:00 2001 From: Yash Gorana Date: Tue, 24 Sep 2024 23:10:42 +0530 Subject: [PATCH 12/13] undo some tox changes --- tox.ini | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tox.ini b/tox.ini index 73b71ff1c56..6e767ee922b 100644 --- a/tox.ini +++ b/tox.ini @@ -1148,7 +1148,7 @@ allowlist_externals = bash commands = bash -c 'devspace purge --force-purge --kube-context k3d-${CLUSTER_NAME} --no-warn --namespace syft; sleep 3' - ; bash -c 'devspace cleanup images --kube-context k3d-${CLUSTER_NAME} --no-warn --namespace syft --var CONTAINER_REGISTRY=k3d-registry.localhost:5800 || true' + bash -c 'devspace cleanup images --kube-context k3d-${CLUSTER_NAME} --no-warn --namespace syft --var CONTAINER_REGISTRY=k3d-registry.localhost:5800 || true' bash -c 'kubectl --context k3d-${CLUSTER_NAME} delete namespace syft --now=true || true' [testenv:dev.k8s.render] @@ -1292,9 +1292,6 @@ commands = ; destroy cluster tox -e dev.k8s.destroy - ; destroy cluster - bash -c 'CLUSTER_NAME=signoz tox -e dev.k8s.destroy' - ; destroy registry bash -c 'k3d registry delete registry.localhost || true' bash -c 'docker volume rm k3d-registry-vol --force || true' From 283f747bf121cda7e19412793c16369882756b56 Mon Sep 17 00:00:00 2001 From: Yash Gorana Date: Tue, 24 Sep 2024 23:51:36 +0530 Subject: [PATCH 13/13] fix tracing being imported when disabled --- packages/syft/src/syft/util/telemetry.py | 24 +++++++++++++++---- .../syft/src/syft/util/trace_decorator.py | 12 +--------- 2 files changed, 21 insertions(+), 15 deletions(-) diff --git a/packages/syft/src/syft/util/telemetry.py b/packages/syft/src/syft/util/telemetry.py index d14b1235e2d..99a95d991d1 100644 --- a/packages/syft/src/syft/util/telemetry.py +++ b/packages/syft/src/syft/util/telemetry.py @@ -1,10 +1,11 @@ # stdlib +from collections.abc import Callable import logging import os from typing import Any +from typing import TypeVar # relative -from . import trace_decorator from .. import __version__ from .util import str_to_bool @@ -18,10 +19,22 @@ TRACING_ENABLED = str_to_bool(os.environ.get("TRACING", "False")) logger = logging.getLogger(__name__) +T = TypeVar("T", bound=Callable | type) + + +def no_instrument(__func_or_class: T | None = None, /, *args: Any, **kwargs: Any) -> T: + def noop_wrapper(__func_or_class: T) -> T: + return __func_or_class + + if __func_or_class is None: + return noop_wrapper # type: ignore + else: + return __func_or_class + def setup_instrumenter() -> Any: if not TRACING_ENABLED: - return trace_decorator.no_instrument + return no_instrument try: # third party @@ -35,6 +48,9 @@ def setup_instrumenter() -> Any: from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor + # relative + from .trace_decorator import instrument + # create a resource resource = Resource({"syft.version": __version__}) resource = resource.merge(OTELResourceDetector().detect()) @@ -53,10 +69,10 @@ def setup_instrumenter() -> Any: trace.set_tracer_provider(provider) logger.info("Added TracerProvider with BatchSpanProcessor") - return trace_decorator.instrument + return instrument except Exception as e: logger.error("Failed to import opentelemetry", exc_info=e) - return trace_decorator.no_instrument + return no_instrument def instrument_fastapi(app: Any) -> None: diff --git a/packages/syft/src/syft/util/trace_decorator.py b/packages/syft/src/syft/util/trace_decorator.py index 403ad188d6b..eaa259330cd 100644 --- a/packages/syft/src/syft/util/trace_decorator.py +++ b/packages/syft/src/syft/util/trace_decorator.py @@ -17,7 +17,7 @@ from opentelemetry.trace import Tracer from opentelemetry.trace.span import Span -__all__ = ["instrument", "no_instrument", "T"] +__all__ = ["instrument"] T = TypeVar("T", bound=Callable | type) @@ -172,13 +172,3 @@ async def wrap_with_span_async(*args: Any, **kwargs: Any) -> Callable: return span_decorator(_func_or_class) else: return span_decorator # type: ignore - - -def no_instrument(__func_or_class: T | None = None, /, *args: Any, **kwargs: Any) -> T: - def noop_wrapper(__func_or_class: T) -> T: - return __func_or_class - - if __func_or_class is None: - return noop_wrapper # type: ignore - else: - return __func_or_class