Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update otel instrumentation #9285

Merged
merged 20 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion notebooks/scenarios/bigquery/010-setup-bigquery-pool.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@
"metadata": {},
"outputs": [],
"source": [
"# overwrite it for now Mongo ignore\n",
"# overwrite it for now Postgres ignore\n",
"result = high_client.api.services.worker_image.submit(worker_config=docker_config)\n",
"result"
]
Expand Down
2 changes: 1 addition & 1 deletion packages/grid/backend/backend.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ ENV \
APPDIR="/root/app" \
SERVER_NAME="default_server_name" \
SERVER_TYPE="datasite" \
SERVICE_NAME="backend" \
SERVER_SIDE_TYPE="high" \
RELEASE="production" \
DEV_MODE="False" \
DEBUGGER_ENABLED="False" \
Expand Down
27 changes: 18 additions & 9 deletions packages/grid/backend/grid/logging.yaml
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
version: 1
disable_existing_loggers: True

formatters:
default:
format: "%(asctime)s - %(levelname)s - %(name)s - %(message)s"
datefmt: "%Y-%m-%d %H:%M:%S"
uvicorn.default:
"()": uvicorn.logging.DefaultFormatter
format: "%(asctime)s - %(levelname)s - %(name)s - %(message)s"
uvicorn.access:
"()": "uvicorn.logging.AccessFormatter"
format: "%(asctime)s - %(levelname)s - %(name)s - %(message)s"
datefmt: "%Y-%m-%d %H:%M:%S"

handlers:
default:
formatter: default
Expand All @@ -24,23 +24,32 @@ handlers:
formatter: uvicorn.access
class: logging.StreamHandler
stream: ext://sys.stdout

loggers:
# uvicorn loggers
uvicorn.error:
level: INFO
handlers:
- uvicorn.default
propagate: no
propagate: false
uvicorn.access:
level: INFO
handlers:
- uvicorn.access
propagate: no
propagate: false
# syft & grid loggers
syft:
level: INFO
handlers:
- default
propagate: no
root:
level: INFO
handlers:
- default
propagate: false
grid:
level: INFO
handlers:
- default
propagate: false
# root logger
# do not set level, else pip packages will be affected
"":
handlers:
- default
147 changes: 58 additions & 89 deletions packages/grid/backend/grid/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,129 +6,98 @@
# third party
from fastapi import FastAPI
from fastapi.responses import JSONResponse
from starlette.middleware import Middleware
from starlette.middleware.cors import CORSMiddleware

# syft absolute
from syft.protocol.data_protocol import stage_protocol_changes
from syft.util.telemetry import instrument_fastapi

# server absolute
from grid.api.router import api_router
from grid.core.config import settings
from grid.core.server import worker

# logger => grid.main
logger = logging.getLogger(__name__)


class FastAPILogFilter(logging.Filter):
HEALTHCHECK_ENDPOINT = f"{settings.API_V2_STR}/?probe="

class EndpointFilter(logging.Filter):
def filter(self, record: logging.LogRecord) -> bool:
return record.getMessage().find("/api/v2/?probe=livenessProbe") == -1
return record.getMessage().find(self.HEALTHCHECK_ENDPOINT) == -1


def on_app_startup(app: FastAPI) -> None:
if settings.DEV_MODE:
# syft absolute
from syft.protocol.data_protocol import stage_protocol_changes

logger.info("Staging protocol changes...")
status = stage_protocol_changes()
logger.info(f"Staging protocol result: {status}")


def on_app_shutdown(app: FastAPI) -> None:
worker.stop()
logger.info("Worker Stopped")

logger = logging.getLogger("uvicorn.error")
logging.getLogger("uvicorn.access").addFilter(EndpointFilter())

def get_middlewares() -> FastAPI:
middlewares = []

# Set all CORS enabled origins
if settings.BACKEND_CORS_ORIGINS:
middlewares.append(
Middleware(
CORSMiddleware,
allow_origins=[str(origin) for origin in settings.BACKEND_CORS_ORIGINS],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
)

return middlewares


@asynccontextmanager
async def lifespan(app: FastAPI) -> Any:
try:
on_app_startup(app)
yield
finally:
worker.stop()
logger.info("Worker Stop")
on_app_shutdown(app)


app = FastAPI(
title=settings.PROJECT_NAME,
openapi_url=f"{settings.API_V2_STR}/openapi.json",
lifespan=lifespan,
)
def create_app() -> FastAPI:
app = FastAPI(
title=settings.PROJECT_NAME,
openapi_url=f"{settings.API_V2_STR}/openapi.json",
lifespan=lifespan,
middleware=get_middlewares(),
)

# instrument app
instrument_fastapi(app)

# Set all CORS enabled origins
if settings.BACKEND_CORS_ORIGINS:
app.add_middleware(
CORSMiddleware,
allow_origins=[str(origin) for origin in settings.BACKEND_CORS_ORIGINS],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# patch logger to ignore healthcheck logs
logging.getLogger("uvicorn.access").addFilter(FastAPILogFilter())

# add Syft API routes
app.include_router(api_router, prefix=settings.API_V2_STR)

app.include_router(api_router, prefix=settings.API_V2_STR)
logger.info("Included routes, app should now be reachable")
return app


if settings.DEV_MODE:
logger.info("Staging protocol changes...")
status = stage_protocol_changes()
logger.info(f"Staging protocol result: {status}")
app = create_app()


# needed for Google Kubernetes Engine LoadBalancer Healthcheck
@app.get(
"/",
name="healthcheck",
status_code=200,
response_class=JSONResponse,
)
def healthcheck() -> dict[str, str]:
"""
Currently, all service backends must satisfy either of the following requirements to
pass the HTTP health checks sent to it from the GCE loadbalancer: 1. Respond with a
200 on '/'. The content does not matter. 2. Expose an arbitrary url as a readiness
probe on the pods backing the Service.
"""
return {"status": "ok"}


if settings.TRACING_ENABLED:
try:
# stdlib
import os

endpoint = os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT", None)
# third party
from opentelemetry._logs import set_logger_provider
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter
from opentelemetry.sdk._logs import LoggerProvider
from opentelemetry.sdk._logs import LoggingHandler
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
from opentelemetry.sdk.resources import Resource

logger_provider = LoggerProvider(
resource=Resource.create(
{
"service.name": "backend-container",
}
),
)
set_logger_provider(logger_provider)

exporter = OTLPLogExporter(insecure=True, endpoint=endpoint)

logger_provider.add_log_record_processor(BatchLogRecordProcessor(exporter))
handler = LoggingHandler(level=logging.NOTSET, logger_provider=logger_provider)

# Attach OTLP handler to root logger
logging.getLogger().addHandler(handler)
logger = logging.getLogger(__name__)
message = "> Added OTEL BatchLogRecordProcessor"
print(message)
logger.info(message)

except Exception as e:
print(f"Failed to load OTLPLogExporter. {e}")

# third party
try:
# third party
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor

FastAPIInstrumentor.instrument_app(app)
message = "> Added OTEL FastAPIInstrumentor"
print(message)
logger = logging.getLogger(__name__)
logger.info(message)
except Exception as e:
error = f"Failed to load FastAPIInstrumentor. {e}"
print(error)
logger = logging.getLogger(__name__)
logger.error(message)
34 changes: 24 additions & 10 deletions packages/grid/backend/grid/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@ APP_MODULE=grid.main:app
LOG_LEVEL=${LOG_LEVEL:-info}
HOST=${HOST:-0.0.0.0}
PORT=${PORT:-80}
SERVER_TYPE=${SERVER_TYPE:-datasite}
APPDIR=${APPDIR:-$HOME/app}
RELOAD=""
ROOT_PROC=""

export CREDENTIALS_PATH=${CREDENTIALS_PATH:-$HOME/data/creds/credentials.json}
export SERVER_PRIVATE_KEY=$(python $APPDIR/grid/bootstrap.py --private_key)
export SERVER_UID=$(python $APPDIR/grid/bootstrap.py --uid)

if [[ ${DEV_MODE} == "True" ]];
then
echo "Hot-reload Enabled"
Expand All @@ -28,20 +31,31 @@ fi

if [[ ${TRACING} == "true" ]];
then
# TODOs:
# ? Kubernetes OTel operator is recommended by signoz
export OTEL_PYTHON_LOG_CORRELATION=${OTEL_PYTHON_LOG_CORRELATION:-true}
echo "OpenTelemetry Enabled. Endpoint=$OTEL_EXPORTER_OTLP_ENDPOINT Protocol=$OTEL_EXPORTER_OTLP_PROTOCOL"
# TODO: Polish these values up
DEPLOYMENT_ENV="$SERVER_TYPE-$SERVER_SIDE_TYPE"
RESOURCE_ATTRS=(
"deployment.environment=$DEPLOYMENT_ENV"
"service.namespace=$DEPLOYMENT_ENV"
"service.instance.id=$SERVER_UID"
"k8s.pod.name=${K8S_POD_NAME:-"none"}"
"k8s.namespace.name=${K8S_NAMESPACE:"none"}"
"syft.server.uid=$SERVER_UID"
"syft.server.type=$SERVER_TYPE"
"syft.server.side.type=$SERVER_SIDE_TYPE"
)

# environ is always prefixed with the server type
export OTEL_SERVICE_NAME="${DEPLOYMENT_ENV}-${OTEL_SERVICE_NAME:-"backend"}"
export OTEL_RESOURCE_ATTRIBUTES=$(IFS=, ; echo "${RESOURCE_ATTRS[*]}")

echo "OpenTelemetry Enabled"
env | grep OTEL_
else
echo "OpenTelemetry Disabled"
fi

export CREDENTIALS_PATH=${CREDENTIALS_PATH:-$HOME/data/creds/credentials.json}
export SERVER_PRIVATE_KEY=$(python $APPDIR/grid/bootstrap.py --private_key)
export SERVER_UID=$(python $APPDIR/grid/bootstrap.py --uid)
export SERVER_TYPE=$SERVER_TYPE

echo "SERVER_UID=$SERVER_UID"
echo "SERVER_TYPE=$SERVER_TYPE"
echo "SERVER_SIDE_TYPE=$SERVER_SIDE_TYPE"

exec $ROOT_PROC uvicorn $RELOAD --host $HOST --port $PORT --log-config=$APPDIR/grid/logging.yaml "$APP_MODULE"
2 changes: 1 addition & 1 deletion packages/grid/helm/syft/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ postgres:
# PVC storage size
storageSize: 5Gi

# Mongo secret name. Override this if you want to use a self-managed secret.
# Postgres secret name. Override this if you want to use a self-managed secret.
secretKeyName: postgres-secret

# default/custom secret raw values
Expand Down
2 changes: 1 addition & 1 deletion packages/syft/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,9 @@ telemetry =
opentelemetry-instrumentation-botocore==0.48b0
opentelemetry-instrumentation-logging==0.48b0
opentelemetry-instrumentation-sqlalchemy==0.48b0
opentelemetry-instrumentation-threading==0.48b0
; opentelemetry-instrumentation-asyncio==0.48b0
; opentelemetry-instrumentation-sqlite3==0.48b0
; opentelemetry-instrumentation-threading==0.48b0
; opentelemetry-instrumentation-jinja2==0.48b0
; opentelemetry-instrumentation-system-metrics==0.48b0

Expand Down
3 changes: 3 additions & 0 deletions packages/syft/src/syft/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
from .util.commit import __commit__
from .util.patch_ipython import patch_ipython
from .util.telemetry import instrument
from .util.telemetry import instrument_threads
from .util.util import autocache
from .util.util import get_root_data_path
from .util.version_compare import make_requires
Expand All @@ -103,6 +104,8 @@
sys.path.append(str(Path(__file__)))


instrument_threads()

patch_ipython()


Expand Down
11 changes: 2 additions & 9 deletions packages/syft/src/syft/server/uvicorn.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from ..store.db.db import DBConfig
from ..util.autoreload import enable_autoreload
from ..util.constants import DEFAULT_TIMEOUT
from ..util.telemetry import TRACING_ENABLED
from ..util.telemetry import instrument_fastapi
from ..util.util import os_name
from .datasite import Datasite
from .enclave import Enclave
Expand Down Expand Up @@ -126,14 +126,7 @@ def app_factory() -> FastAPI:
allow_methods=["*"],
allow_headers=["*"],
)

if TRACING_ENABLED:
# third party
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor

FastAPIInstrumentor().instrument_app(app)
print("> Added OTEL FastAPIInstrumentor")

instrument_fastapi(app)
return app


Expand Down
2 changes: 1 addition & 1 deletion packages/syft/src/syft/service/code/user_code.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@
from .utils import parse_code
from .utils import submit_subjobs_code

logger = logging.getLogger(name=__name__)
logger = logging.getLogger(__name__)

if TYPE_CHECKING:
# relative
Expand Down
Loading
Loading