diff --git a/notebooks/scenarios/bigquery/010-setup-bigquery-pool.ipynb b/notebooks/scenarios/bigquery/010-setup-bigquery-pool.ipynb index d72a82f9eb1..e6e6dddcc62 100644 --- a/notebooks/scenarios/bigquery/010-setup-bigquery-pool.ipynb +++ b/notebooks/scenarios/bigquery/010-setup-bigquery-pool.ipynb @@ -350,7 +350,7 @@ "metadata": {}, "outputs": [], "source": [ - "# overwrite it for now Mongo ignore\n", + "# overwrite it for now Postgres ignore\n", "result = high_client.api.services.worker_image.submit(worker_config=docker_config)\n", "result" ] diff --git a/packages/grid/backend/backend.dockerfile b/packages/grid/backend/backend.dockerfile index 984d2f174f4..f612d690958 100644 --- a/packages/grid/backend/backend.dockerfile +++ b/packages/grid/backend/backend.dockerfile @@ -75,7 +75,7 @@ ENV \ APPDIR="/root/app" \ SERVER_NAME="default_server_name" \ SERVER_TYPE="datasite" \ - SERVICE_NAME="backend" \ + SERVER_SIDE_TYPE="high" \ RELEASE="production" \ DEV_MODE="False" \ DEBUGGER_ENABLED="False" \ diff --git a/packages/grid/backend/grid/logging.yaml b/packages/grid/backend/grid/logging.yaml index b41eb783038..9d5d3954f55 100644 --- a/packages/grid/backend/grid/logging.yaml +++ b/packages/grid/backend/grid/logging.yaml @@ -1,16 +1,16 @@ version: 1 disable_existing_loggers: True + formatters: default: format: "%(asctime)s - %(levelname)s - %(name)s - %(message)s" - datefmt: "%Y-%m-%d %H:%M:%S" uvicorn.default: "()": uvicorn.logging.DefaultFormatter format: "%(asctime)s - %(levelname)s - %(name)s - %(message)s" uvicorn.access: "()": "uvicorn.logging.AccessFormatter" format: "%(asctime)s - %(levelname)s - %(name)s - %(message)s" - datefmt: "%Y-%m-%d %H:%M:%S" + handlers: default: formatter: default @@ -24,23 +24,32 @@ handlers: formatter: uvicorn.access class: logging.StreamHandler stream: ext://sys.stdout + loggers: + # uvicorn loggers uvicorn.error: level: INFO handlers: - uvicorn.default - propagate: no + propagate: false uvicorn.access: level: INFO handlers: - uvicorn.access - propagate: no + propagate: false + # syft & grid loggers syft: level: INFO handlers: - default - propagate: no -root: - level: INFO - handlers: - - default + propagate: false + grid: + level: INFO + handlers: + - default + propagate: false + # root logger + # do not set level, else pip packages will be affected + "": + handlers: + - default diff --git a/packages/grid/backend/grid/main.py b/packages/grid/backend/grid/main.py index 12af4165179..1e488644a30 100644 --- a/packages/grid/backend/grid/main.py +++ b/packages/grid/backend/grid/main.py @@ -6,63 +6,93 @@ # third party from fastapi import FastAPI from fastapi.responses import JSONResponse +from starlette.middleware import Middleware from starlette.middleware.cors import CORSMiddleware # syft absolute -from syft.protocol.data_protocol import stage_protocol_changes +from syft.util.telemetry import instrument_fastapi # server absolute from grid.api.router import api_router from grid.core.config import settings from grid.core.server import worker +# logger => grid.main +logger = logging.getLogger(__name__) + + +class FastAPILogFilter(logging.Filter): + HEALTHCHECK_ENDPOINT = f"{settings.API_V2_STR}/?probe=" -class EndpointFilter(logging.Filter): def filter(self, record: logging.LogRecord) -> bool: - return record.getMessage().find("/api/v2/?probe=livenessProbe") == -1 + return record.getMessage().find(self.HEALTHCHECK_ENDPOINT) == -1 + + +def on_app_startup(app: FastAPI) -> None: + if settings.DEV_MODE: + # syft absolute + from syft.protocol.data_protocol import stage_protocol_changes + + logger.info("Staging protocol changes...") + status = stage_protocol_changes() + logger.info(f"Staging protocol result: {status}") + +def on_app_shutdown(app: FastAPI) -> None: + worker.stop() + logger.info("Worker Stopped") -logger = logging.getLogger("uvicorn.error") -logging.getLogger("uvicorn.access").addFilter(EndpointFilter()) + +def get_middlewares() -> FastAPI: + middlewares = [] + + # Set all CORS enabled origins + if settings.BACKEND_CORS_ORIGINS: + middlewares.append( + Middleware( + CORSMiddleware, + allow_origins=[str(origin) for origin in settings.BACKEND_CORS_ORIGINS], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], + ) + ) + + return middlewares @asynccontextmanager async def lifespan(app: FastAPI) -> Any: try: + on_app_startup(app) yield finally: - worker.stop() - logger.info("Worker Stop") + on_app_shutdown(app) -app = FastAPI( - title=settings.PROJECT_NAME, - openapi_url=f"{settings.API_V2_STR}/openapi.json", - lifespan=lifespan, -) +def create_app() -> FastAPI: + app = FastAPI( + title=settings.PROJECT_NAME, + openapi_url=f"{settings.API_V2_STR}/openapi.json", + lifespan=lifespan, + middleware=get_middlewares(), + ) + # instrument app + instrument_fastapi(app) -# Set all CORS enabled origins -if settings.BACKEND_CORS_ORIGINS: - app.add_middleware( - CORSMiddleware, - allow_origins=[str(origin) for origin in settings.BACKEND_CORS_ORIGINS], - allow_credentials=True, - allow_methods=["*"], - allow_headers=["*"], - ) + # patch logger to ignore healthcheck logs + logging.getLogger("uvicorn.access").addFilter(FastAPILogFilter()) + + # add Syft API routes + app.include_router(api_router, prefix=settings.API_V2_STR) -app.include_router(api_router, prefix=settings.API_V2_STR) -logger.info("Included routes, app should now be reachable") + return app -if settings.DEV_MODE: - logger.info("Staging protocol changes...") - status = stage_protocol_changes() - logger.info(f"Staging protocol result: {status}") +app = create_app() -# needed for Google Kubernetes Engine LoadBalancer Healthcheck @app.get( "/", name="healthcheck", @@ -70,65 +100,4 @@ async def lifespan(app: FastAPI) -> Any: response_class=JSONResponse, ) def healthcheck() -> dict[str, str]: - """ - Currently, all service backends must satisfy either of the following requirements to - pass the HTTP health checks sent to it from the GCE loadbalancer: 1. Respond with a - 200 on '/'. The content does not matter. 2. Expose an arbitrary url as a readiness - probe on the pods backing the Service. - """ return {"status": "ok"} - - -if settings.TRACING_ENABLED: - try: - # stdlib - import os - - endpoint = os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT", None) - # third party - from opentelemetry._logs import set_logger_provider - from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter - from opentelemetry.sdk._logs import LoggerProvider - from opentelemetry.sdk._logs import LoggingHandler - from opentelemetry.sdk._logs.export import BatchLogRecordProcessor - from opentelemetry.sdk.resources import Resource - - logger_provider = LoggerProvider( - resource=Resource.create( - { - "service.name": "backend-container", - } - ), - ) - set_logger_provider(logger_provider) - - exporter = OTLPLogExporter(insecure=True, endpoint=endpoint) - - logger_provider.add_log_record_processor(BatchLogRecordProcessor(exporter)) - handler = LoggingHandler(level=logging.NOTSET, logger_provider=logger_provider) - - # Attach OTLP handler to root logger - logging.getLogger().addHandler(handler) - logger = logging.getLogger(__name__) - message = "> Added OTEL BatchLogRecordProcessor" - print(message) - logger.info(message) - - except Exception as e: - print(f"Failed to load OTLPLogExporter. {e}") - - # third party - try: - # third party - from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor - - FastAPIInstrumentor.instrument_app(app) - message = "> Added OTEL FastAPIInstrumentor" - print(message) - logger = logging.getLogger(__name__) - logger.info(message) - except Exception as e: - error = f"Failed to load FastAPIInstrumentor. {e}" - print(error) - logger = logging.getLogger(__name__) - logger.error(message) diff --git a/packages/grid/backend/grid/start.sh b/packages/grid/backend/grid/start.sh index ace8d76e455..2831efa5b6f 100755 --- a/packages/grid/backend/grid/start.sh +++ b/packages/grid/backend/grid/start.sh @@ -7,11 +7,14 @@ APP_MODULE=grid.main:app LOG_LEVEL=${LOG_LEVEL:-info} HOST=${HOST:-0.0.0.0} PORT=${PORT:-80} -SERVER_TYPE=${SERVER_TYPE:-datasite} APPDIR=${APPDIR:-$HOME/app} RELOAD="" ROOT_PROC="" +export CREDENTIALS_PATH=${CREDENTIALS_PATH:-$HOME/data/creds/credentials.json} +export SERVER_PRIVATE_KEY=$(python $APPDIR/grid/bootstrap.py --private_key) +export SERVER_UID=$(python $APPDIR/grid/bootstrap.py --uid) + if [[ ${DEV_MODE} == "True" ]]; then echo "Hot-reload Enabled" @@ -28,20 +31,31 @@ fi if [[ ${TRACING} == "true" ]]; then - # TODOs: - # ? Kubernetes OTel operator is recommended by signoz - export OTEL_PYTHON_LOG_CORRELATION=${OTEL_PYTHON_LOG_CORRELATION:-true} - echo "OpenTelemetry Enabled. Endpoint=$OTEL_EXPORTER_OTLP_ENDPOINT Protocol=$OTEL_EXPORTER_OTLP_PROTOCOL" + # TODO: Polish these values up + DEPLOYMENT_ENV="$SERVER_TYPE-$SERVER_SIDE_TYPE" + RESOURCE_ATTRS=( + "deployment.environment=$DEPLOYMENT_ENV" + "service.namespace=$DEPLOYMENT_ENV" + "service.instance.id=$SERVER_UID" + "k8s.pod.name=${K8S_POD_NAME:-"none"}" + "k8s.namespace.name=${K8S_NAMESPACE:"none"}" + "syft.server.uid=$SERVER_UID" + "syft.server.type=$SERVER_TYPE" + "syft.server.side.type=$SERVER_SIDE_TYPE" + ) + + # environ is always prefixed with the server type + export OTEL_SERVICE_NAME="${DEPLOYMENT_ENV}-${OTEL_SERVICE_NAME:-"backend"}" + export OTEL_RESOURCE_ATTRIBUTES=$(IFS=, ; echo "${RESOURCE_ATTRS[*]}") + + echo "OpenTelemetry Enabled" + env | grep OTEL_ else echo "OpenTelemetry Disabled" fi -export CREDENTIALS_PATH=${CREDENTIALS_PATH:-$HOME/data/creds/credentials.json} -export SERVER_PRIVATE_KEY=$(python $APPDIR/grid/bootstrap.py --private_key) -export SERVER_UID=$(python $APPDIR/grid/bootstrap.py --uid) -export SERVER_TYPE=$SERVER_TYPE - echo "SERVER_UID=$SERVER_UID" echo "SERVER_TYPE=$SERVER_TYPE" +echo "SERVER_SIDE_TYPE=$SERVER_SIDE_TYPE" exec $ROOT_PROC uvicorn $RELOAD --host $HOST --port $PORT --log-config=$APPDIR/grid/logging.yaml "$APP_MODULE" diff --git a/packages/grid/helm/syft/values.yaml b/packages/grid/helm/syft/values.yaml index e78910db98b..b6728cefa2a 100644 --- a/packages/grid/helm/syft/values.yaml +++ b/packages/grid/helm/syft/values.yaml @@ -36,7 +36,7 @@ postgres: # PVC storage size storageSize: 5Gi - # Mongo secret name. Override this if you want to use a self-managed secret. + # Postgres secret name. Override this if you want to use a self-managed secret. secretKeyName: postgres-secret # default/custom secret raw values diff --git a/packages/syft/setup.cfg b/packages/syft/setup.cfg index d4ee2cff521..7d1a8b2a616 100644 --- a/packages/syft/setup.cfg +++ b/packages/syft/setup.cfg @@ -117,9 +117,9 @@ telemetry = opentelemetry-instrumentation-botocore==0.48b0 opentelemetry-instrumentation-logging==0.48b0 opentelemetry-instrumentation-sqlalchemy==0.48b0 + opentelemetry-instrumentation-threading==0.48b0 ; opentelemetry-instrumentation-asyncio==0.48b0 ; opentelemetry-instrumentation-sqlite3==0.48b0 - ; opentelemetry-instrumentation-threading==0.48b0 ; opentelemetry-instrumentation-jinja2==0.48b0 ; opentelemetry-instrumentation-system-metrics==0.48b0 diff --git a/packages/syft/src/syft/__init__.py b/packages/syft/src/syft/__init__.py index eda917138db..5f9603a7c43 100644 --- a/packages/syft/src/syft/__init__.py +++ b/packages/syft/src/syft/__init__.py @@ -92,6 +92,7 @@ from .util.commit import __commit__ from .util.patch_ipython import patch_ipython from .util.telemetry import instrument +from .util.telemetry import instrument_threads from .util.util import autocache from .util.util import get_root_data_path from .util.version_compare import make_requires @@ -105,6 +106,8 @@ sys.path.append(str(Path(__file__))) +instrument_threads() + patch_ipython() diff --git a/packages/syft/src/syft/server/uvicorn.py b/packages/syft/src/syft/server/uvicorn.py index e1982953a32..635ef61c9a1 100644 --- a/packages/syft/src/syft/server/uvicorn.py +++ b/packages/syft/src/syft/server/uvicorn.py @@ -30,7 +30,7 @@ from ..store.db.db import DBConfig from ..util.autoreload import enable_autoreload from ..util.constants import DEFAULT_TIMEOUT -from ..util.telemetry import TRACING_ENABLED +from ..util.telemetry import instrument_fastapi from ..util.util import os_name from .datasite import Datasite from .enclave import Enclave @@ -126,14 +126,7 @@ def app_factory() -> FastAPI: allow_methods=["*"], allow_headers=["*"], ) - - if TRACING_ENABLED: - # third party - from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor - - FastAPIInstrumentor().instrument_app(app) - print("> Added OTEL FastAPIInstrumentor") - + instrument_fastapi(app) return app diff --git a/packages/syft/src/syft/service/code/user_code.py b/packages/syft/src/syft/service/code/user_code.py index fea4927cb82..51b41a513ab 100644 --- a/packages/syft/src/syft/service/code/user_code.py +++ b/packages/syft/src/syft/service/code/user_code.py @@ -103,7 +103,7 @@ from .utils import parse_code from .utils import submit_subjobs_code -logger = logging.getLogger(name=__name__) +logger = logging.getLogger(__name__) if TYPE_CHECKING: # relative diff --git a/packages/syft/src/syft/service/queue/zmq_client.py b/packages/syft/src/syft/service/queue/zmq_client.py index 9265d9edd3d..1c68ded7537 100644 --- a/packages/syft/src/syft/service/queue/zmq_client.py +++ b/packages/syft/src/syft/service/queue/zmq_client.py @@ -1,5 +1,6 @@ # stdlib from collections import defaultdict +import logging import socketserver # relative @@ -21,6 +22,8 @@ from .zmq_consumer import ZMQConsumer from .zmq_producer import ZMQProducer +logger = logging.getLogger(__name__) + @serializable() class ZMQClientConfig(SyftObject, QueueClientConfig): @@ -77,7 +80,9 @@ def add_producer( else: port = self.config.queue_port - print(f"Adding producer for queue: {queue_name} on: {get_queue_address(port)}") + logger.info( + f"Adding producer for queue: {queue_name} on: {get_queue_address(port)}" + ) producer = ZMQProducer( queue_name=queue_name, queue_stash=queue_stash, diff --git a/packages/syft/src/syft/service/worker/utils.py b/packages/syft/src/syft/service/worker/utils.py index 64171c5f90b..01d440879bd 100644 --- a/packages/syft/src/syft/service/worker/utils.py +++ b/packages/syft/src/syft/service/worker/utils.py @@ -353,9 +353,6 @@ def create_kubernetes_pool( "CREATE_PRODUCER": "False", "INMEMORY_WORKERS": "False", "OTEL_SERVICE_NAME": f"{pool_name}", - "OTEL_PYTHON_LOG_CORRELATION": os.environ.get( - "OTEL_PYTHON_LOG_CORRELATION" - ), "OTEL_EXPORTER_OTLP_ENDPOINT": os.environ.get( "OTEL_EXPORTER_OTLP_ENDPOINT" ), diff --git a/packages/syft/src/syft/store/blob_storage/seaweedfs.py b/packages/syft/src/syft/store/blob_storage/seaweedfs.py index 3409035906f..99ae35e6cea 100644 --- a/packages/syft/src/syft/store/blob_storage/seaweedfs.py +++ b/packages/syft/src/syft/store/blob_storage/seaweedfs.py @@ -42,26 +42,16 @@ from ...types.syft_object import SYFT_OBJECT_VERSION_1 from ...types.uid import UID from ...util.constants import DEFAULT_TIMEOUT -from ...util.telemetry import TRACING_ENABLED - -logger = logging.getLogger(__name__) +from ...util.telemetry import instrument_botocore MAX_QUEUE_SIZE = 100 WRITE_EXPIRATION_TIME = 900 # seconds DEFAULT_FILE_PART_SIZE = 1024**3 # 1GB DEFAULT_UPLOAD_CHUNK_SIZE = 1024 * 800 # 800KB -if TRACING_ENABLED: - try: - # third party - from opentelemetry.instrumentation.botocore import BotocoreInstrumentor - - BotocoreInstrumentor().instrument() - message = "> Added OTEL BotocoreInstrumentor" - print(message) - logger.info(message) - except Exception: # nosec - pass +logger = logging.getLogger(__name__) + +instrument_botocore() @serializable() diff --git a/packages/syft/src/syft/store/db/db.py b/packages/syft/src/syft/store/db/db.py index cc82e5a3f4e..08d85b2fa53 100644 --- a/packages/syft/src/syft/store/db/db.py +++ b/packages/syft/src/syft/store/db/db.py @@ -13,10 +13,12 @@ from ...serde.serializable import serializable from ...server.credentials import SyftVerifyKey from ...types.uid import UID +from ...util.telemetry import instrument_sqlalchemny from .schema import PostgresBase from .schema import SQLiteBase logger = logging.getLogger(__name__) +instrument_sqlalchemny() @serializable(canonical_name="DBConfig", version=1) diff --git a/packages/syft/src/syft/store/db/stash.py b/packages/syft/src/syft/store/db/stash.py index 5323f3455c8..f2083a1a424 100644 --- a/packages/syft/src/syft/store/db/stash.py +++ b/packages/syft/src/syft/store/db/stash.py @@ -105,7 +105,7 @@ def __init__(self, store: DBManager) -> None: self.db = store self.object_type = self.get_object_type() self.table = create_table(self.object_type, self.dialect) - self.sessionmaker = self.db.sessionmaker + self.sessionmaker: Callable[[], Session] = self.db.sessionmaker @property def dialect(self) -> sa.engine.interfaces.Dialect: diff --git a/packages/syft/src/syft/util/telemetry.py b/packages/syft/src/syft/util/telemetry.py index ef376d78fc0..99a95d991d1 100644 --- a/packages/syft/src/syft/util/telemetry.py +++ b/packages/syft/src/syft/util/telemetry.py @@ -6,18 +6,23 @@ from typing import TypeVar # relative +from .. import __version__ from .util import str_to_bool -__all__ = ["TRACING_ENABLED", "instrument"] - -logger = logging.getLogger(__name__) +__all__ = [ + "TRACING_ENABLED", + "instrument", + "instrument_fastapi", + "instrument_botocore", +] TRACING_ENABLED = str_to_bool(os.environ.get("TRACING", "False")) +logger = logging.getLogger(__name__) T = TypeVar("T", bound=Callable | type) -def noop(__func_or_class: T | None = None, /, *args: Any, **kwargs: Any) -> T: +def no_instrument(__func_or_class: T | None = None, /, *args: Any, **kwargs: Any) -> T: def noop_wrapper(__func_or_class: T) -> T: return __func_or_class @@ -27,25 +32,32 @@ def noop_wrapper(__func_or_class: T) -> T: return __func_or_class -if not TRACING_ENABLED: - instrument = noop -else: +def setup_instrumenter() -> Any: + if not TRACING_ENABLED: + return no_instrument + try: # third party from opentelemetry import trace from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import ( OTLPSpanExporter, ) + from opentelemetry.sdk.resources import OTELResourceDetector + from opentelemetry.sdk.resources import ProcessResourceDetector from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor # relative - from .trace_decorator import instrument as _instrument + from .trace_decorator import instrument + + # create a resource + resource = Resource({"syft.version": __version__}) + resource = resource.merge(OTELResourceDetector().detect()) + resource = resource.merge(ProcessResourceDetector().detect()) + logger.debug(f"OTEL resource : {resource.__dict__}") - # create a provider - service_name = os.environ.get("OTEL_SERVICE_NAME", "syft-backend") - resource = Resource.create({"service.name": service_name}) + # create a trace provider from the resource provider = TracerProvider(resource=resource) # create a span processor @@ -56,8 +68,67 @@ def noop_wrapper(__func_or_class: T) -> T: # set the global trace provider trace.set_tracer_provider(provider) - # expose the instrument decorator - instrument = _instrument + logger.info("Added TracerProvider with BatchSpanProcessor") + return instrument except Exception as e: logger.error("Failed to import opentelemetry", exc_info=e) - instrument = noop + return no_instrument + + +def instrument_fastapi(app: Any) -> None: + if not TRACING_ENABLED: + return + + try: + # third party + from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor + + FastAPIInstrumentor().instrument_app(app) + logger.info("Added OTEL FastAPIInstrumentor") + except Exception as e: + logger.error(f"Failed to load FastAPIInstrumentor. {e}") + + +def instrument_botocore() -> None: + if not TRACING_ENABLED: + return + + try: + # third party + from opentelemetry.instrumentation.botocore import BotocoreInstrumentor + + BotocoreInstrumentor().instrument() + logger.info("Added OTEL BotocoreInstrumentor") + except Exception as e: + logger.error(f"Failed to load BotocoreInstrumentor. {e}") + + +def instrument_threads() -> None: + if not TRACING_ENABLED: + return + + try: + # third party + from opentelemetry.instrumentation.threading import ThreadingInstrumentor + + ThreadingInstrumentor().instrument() + logger.info("Added OTEL ThreadingInstrumentor") + except Exception as e: + logger.error(f"Failed to load ThreadingInstrumentor. {e}") + + +def instrument_sqlalchemny() -> None: + if not TRACING_ENABLED: + return + + try: + # third party + from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor + + SQLAlchemyInstrumentor().instrument(enable_commenter=True, commenter_options={}) + logger.info("Added OTEL SQLAlchemyInstrumentor") + except Exception as e: + logger.error(f"Failed to load SQLAlchemyInstrumentor. {e}") + + +instrument = setup_instrumenter() diff --git a/packages/syft/src/syft/util/trace_decorator.py b/packages/syft/src/syft/util/trace_decorator.py index 719ca822afa..eaa259330cd 100644 --- a/packages/syft/src/syft/util/trace_decorator.py +++ b/packages/syft/src/syft/util/trace_decorator.py @@ -6,6 +6,7 @@ from collections.abc import Callable from functools import wraps import inspect +import threading from typing import Any from typing import ClassVar from typing import TypeVar @@ -16,6 +17,10 @@ from opentelemetry.trace import Tracer from opentelemetry.trace.span import Span +__all__ = ["instrument"] + +T = TypeVar("T", bound=Callable | type) + class TracingDecoratorOptions: class NamingSchemes: @@ -39,9 +44,6 @@ def set_default_attributes(cls, attributes: dict[str, str] | None = None) -> Non cls.default_attributes[att] = attributes[att] -T = TypeVar("T", bound=Callable | type) - - def instrument( _func_or_class: T | None = None, /, @@ -116,10 +118,13 @@ def span_decorator(func_or_class: T) -> T: tracer = existing_tracer or trace.get_tracer(func_or_class.__module__) def _set_semantic_attributes(span: Span, func: Callable) -> None: + thread = threading.current_thread() span.set_attribute(SpanAttributes.CODE_NAMESPACE, func.__module__) span.set_attribute(SpanAttributes.CODE_FUNCTION, func.__qualname__) span.set_attribute(SpanAttributes.CODE_FILEPATH, func.__code__.co_filename) span.set_attribute(SpanAttributes.CODE_LINENO, func.__code__.co_firstlineno) + span.set_attribute(SpanAttributes.THREAD_ID, thread.ident) + span.set_attribute(SpanAttributes.THREAD_NAME, thread.name) def _set_attributes( span: Span, attributes_dict: dict[str, str] | None = None diff --git a/packages/syft/tests/conftest.py b/packages/syft/tests/conftest.py index 56506b43fad..a3908f59eae 100644 --- a/packages/syft/tests/conftest.py +++ b/packages/syft/tests/conftest.py @@ -26,8 +26,6 @@ from syft.service.queue.queue_stash import QueueStash from syft.service.user import user -# our version of mongomock that has a fix for CodecOptions and custom TypeRegistry Support - def patch_protocol_file(filepath: Path): dp = get_data_protocol() diff --git a/tox.ini b/tox.ini index eecfd6712ae..6e767ee922b 100644 --- a/tox.ini +++ b/tox.ini @@ -910,7 +910,7 @@ commands = (kubectl logs service/backend --context k3d-$CLUSTER_NAME --namespace syft -f &) | grep -q 'Application startup complete' || true; \ fi" - # Mongo + # Postgres bash -c "if echo '{posargs}' | grep -q 'postgres'; then echo 'Checking readiness of Postgres'; ./scripts/wait_for.sh service postgres --context k3d-$CLUSTER_NAME --namespace syft; fi" # Proxy @@ -1037,22 +1037,26 @@ commands = ; restarts coredns bash -c 'kubectl delete pod -n kube-system -l k8s-app=kube-dns --context k3d-${CLUSTER_NAME}' -[testenv:dev.k8s.install.signoz] -description = Install Signoz on local Kubernetes cluster + +[testenv:dev.k8s.add.collector] +description = Install signoz/k8s-infra on Kubernetes cluster changedir = {toxinidir} passenv=HOME,USER,CLUSTER_NAME -allowlist_externals= - bash -setenv= - CLUSTER_NAME = {env:CLUSTER_NAME:test-datasite-1} - SIGNOZ_PORT = {env:SIGNOZ_PORT:3301} -commands= - bash -c 'if [ "{posargs}" ]; then kubectl config use-context {posargs}; fi' - bash -c 'helm repo add signoz https://charts.signoz.io && helm repo update' - bash -c 'helm install syft signoz/signoz --namespace platform --create-namespace || true' - # bash -c 'k3d cluster edit ${CLUSTER_NAME} --port-add "${SIGNOZ_PORT}:3301@loadbalancer"' - ; bash packages/grid/scripts/wait_for.sh service syft-signoz-frontend --context k3d-{env:CLUSTER_NAME} --namespace platform - +setenv = + SIGNOZ_HOST=host.k3d.internal + CLUSTER_NAME = {env:CLUSTER_NAME:syft-dev} +allowlist_externals = + helm +commands = + helm install k8s-infra k8s-infra \ + --repo https://charts.signoz.io \ + --kube-context k3d-{env:CLUSTER_NAME} \ + --set global.deploymentEnvironment=local \ + --set clusterName={env:CLUSTER_NAME} \ + --set otelCollectorEndpoint=http://{env:SIGNOZ_HOST}:4317 \ + --set otelInsecure=true \ + --set presets.otlpExporter.enabled=true \ + --set presets.loggingExporter.enabled=true [testenv:dev.k8s.start] description = Start local Kubernetes registry & cluster with k3d @@ -1075,10 +1079,13 @@ commands = -p "${CLUSTER_HTTP_PORT}:80@loadbalancer" \ --registry-use k3d-registry.localhost:5800 {posargs} && \ kubectl --context k3d-${CLUSTER_NAME} create namespace syft || true' - # -p "${SIGNOZ_PORT}:3301@loadbalancer" \ + ; patch coredns tox -e dev.k8s.patch.coredns + ; add signoz/collector + tox -e dev.k8s.add.collector + ; dump cluster info tox -e dev.k8s.info