Skip to content

Commit

Permalink
Merge pull request #9285 from OpenMined/yash/collector
Browse files Browse the repository at this point in the history
Update otel instrumentation
  • Loading branch information
yashgorana authored Sep 24, 2024
2 parents 3ace0ad + 283f747 commit ce97a4a
Show file tree
Hide file tree
Showing 19 changed files with 239 additions and 176 deletions.
2 changes: 1 addition & 1 deletion notebooks/scenarios/bigquery/010-setup-bigquery-pool.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@
"metadata": {},
"outputs": [],
"source": [
"# overwrite it for now Mongo ignore\n",
"# overwrite it for now Postgres ignore\n",
"result = high_client.api.services.worker_image.submit(worker_config=docker_config)\n",
"result"
]
Expand Down
2 changes: 1 addition & 1 deletion packages/grid/backend/backend.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ ENV \
APPDIR="/root/app" \
SERVER_NAME="default_server_name" \
SERVER_TYPE="datasite" \
SERVICE_NAME="backend" \
SERVER_SIDE_TYPE="high" \
RELEASE="production" \
DEV_MODE="False" \
DEBUGGER_ENABLED="False" \
Expand Down
27 changes: 18 additions & 9 deletions packages/grid/backend/grid/logging.yaml
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
version: 1
disable_existing_loggers: True

formatters:
default:
format: "%(asctime)s - %(levelname)s - %(name)s - %(message)s"
datefmt: "%Y-%m-%d %H:%M:%S"
uvicorn.default:
"()": uvicorn.logging.DefaultFormatter
format: "%(asctime)s - %(levelname)s - %(name)s - %(message)s"
uvicorn.access:
"()": "uvicorn.logging.AccessFormatter"
format: "%(asctime)s - %(levelname)s - %(name)s - %(message)s"
datefmt: "%Y-%m-%d %H:%M:%S"

handlers:
default:
formatter: default
Expand All @@ -24,23 +24,32 @@ handlers:
formatter: uvicorn.access
class: logging.StreamHandler
stream: ext://sys.stdout

loggers:
# uvicorn loggers
uvicorn.error:
level: INFO
handlers:
- uvicorn.default
propagate: no
propagate: false
uvicorn.access:
level: INFO
handlers:
- uvicorn.access
propagate: no
propagate: false
# syft & grid loggers
syft:
level: INFO
handlers:
- default
propagate: no
root:
level: INFO
handlers:
- default
propagate: false
grid:
level: INFO
handlers:
- default
propagate: false
# root logger
# do not set level, else pip packages will be affected
"":
handlers:
- default
147 changes: 58 additions & 89 deletions packages/grid/backend/grid/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,129 +6,98 @@
# third party
from fastapi import FastAPI
from fastapi.responses import JSONResponse
from starlette.middleware import Middleware
from starlette.middleware.cors import CORSMiddleware

# syft absolute
from syft.protocol.data_protocol import stage_protocol_changes
from syft.util.telemetry import instrument_fastapi

# server absolute
from grid.api.router import api_router
from grid.core.config import settings
from grid.core.server import worker

# logger => grid.main
logger = logging.getLogger(__name__)


class FastAPILogFilter(logging.Filter):
HEALTHCHECK_ENDPOINT = f"{settings.API_V2_STR}/?probe="

class EndpointFilter(logging.Filter):
def filter(self, record: logging.LogRecord) -> bool:
return record.getMessage().find("/api/v2/?probe=livenessProbe") == -1
return record.getMessage().find(self.HEALTHCHECK_ENDPOINT) == -1


def on_app_startup(app: FastAPI) -> None:
if settings.DEV_MODE:
# syft absolute
from syft.protocol.data_protocol import stage_protocol_changes

logger.info("Staging protocol changes...")
status = stage_protocol_changes()
logger.info(f"Staging protocol result: {status}")


def on_app_shutdown(app: FastAPI) -> None:
worker.stop()
logger.info("Worker Stopped")

logger = logging.getLogger("uvicorn.error")
logging.getLogger("uvicorn.access").addFilter(EndpointFilter())

def get_middlewares() -> FastAPI:
middlewares = []

# Set all CORS enabled origins
if settings.BACKEND_CORS_ORIGINS:
middlewares.append(
Middleware(
CORSMiddleware,
allow_origins=[str(origin) for origin in settings.BACKEND_CORS_ORIGINS],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
)

return middlewares


@asynccontextmanager
async def lifespan(app: FastAPI) -> Any:
try:
on_app_startup(app)
yield
finally:
worker.stop()
logger.info("Worker Stop")
on_app_shutdown(app)


app = FastAPI(
title=settings.PROJECT_NAME,
openapi_url=f"{settings.API_V2_STR}/openapi.json",
lifespan=lifespan,
)
def create_app() -> FastAPI:
app = FastAPI(
title=settings.PROJECT_NAME,
openapi_url=f"{settings.API_V2_STR}/openapi.json",
lifespan=lifespan,
middleware=get_middlewares(),
)

# instrument app
instrument_fastapi(app)

# Set all CORS enabled origins
if settings.BACKEND_CORS_ORIGINS:
app.add_middleware(
CORSMiddleware,
allow_origins=[str(origin) for origin in settings.BACKEND_CORS_ORIGINS],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# patch logger to ignore healthcheck logs
logging.getLogger("uvicorn.access").addFilter(FastAPILogFilter())

# add Syft API routes
app.include_router(api_router, prefix=settings.API_V2_STR)

app.include_router(api_router, prefix=settings.API_V2_STR)
logger.info("Included routes, app should now be reachable")
return app


if settings.DEV_MODE:
logger.info("Staging protocol changes...")
status = stage_protocol_changes()
logger.info(f"Staging protocol result: {status}")
app = create_app()


# needed for Google Kubernetes Engine LoadBalancer Healthcheck
@app.get(
"/",
name="healthcheck",
status_code=200,
response_class=JSONResponse,
)
def healthcheck() -> dict[str, str]:
"""
Currently, all service backends must satisfy either of the following requirements to
pass the HTTP health checks sent to it from the GCE loadbalancer: 1. Respond with a
200 on '/'. The content does not matter. 2. Expose an arbitrary url as a readiness
probe on the pods backing the Service.
"""
return {"status": "ok"}


if settings.TRACING_ENABLED:
try:
# stdlib
import os

endpoint = os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT", None)
# third party
from opentelemetry._logs import set_logger_provider
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter
from opentelemetry.sdk._logs import LoggerProvider
from opentelemetry.sdk._logs import LoggingHandler
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
from opentelemetry.sdk.resources import Resource

logger_provider = LoggerProvider(
resource=Resource.create(
{
"service.name": "backend-container",
}
),
)
set_logger_provider(logger_provider)

exporter = OTLPLogExporter(insecure=True, endpoint=endpoint)

logger_provider.add_log_record_processor(BatchLogRecordProcessor(exporter))
handler = LoggingHandler(level=logging.NOTSET, logger_provider=logger_provider)

# Attach OTLP handler to root logger
logging.getLogger().addHandler(handler)
logger = logging.getLogger(__name__)
message = "> Added OTEL BatchLogRecordProcessor"
print(message)
logger.info(message)

except Exception as e:
print(f"Failed to load OTLPLogExporter. {e}")

# third party
try:
# third party
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor

FastAPIInstrumentor.instrument_app(app)
message = "> Added OTEL FastAPIInstrumentor"
print(message)
logger = logging.getLogger(__name__)
logger.info(message)
except Exception as e:
error = f"Failed to load FastAPIInstrumentor. {e}"
print(error)
logger = logging.getLogger(__name__)
logger.error(message)
34 changes: 24 additions & 10 deletions packages/grid/backend/grid/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@ APP_MODULE=grid.main:app
LOG_LEVEL=${LOG_LEVEL:-info}
HOST=${HOST:-0.0.0.0}
PORT=${PORT:-80}
SERVER_TYPE=${SERVER_TYPE:-datasite}
APPDIR=${APPDIR:-$HOME/app}
RELOAD=""
ROOT_PROC=""

export CREDENTIALS_PATH=${CREDENTIALS_PATH:-$HOME/data/creds/credentials.json}
export SERVER_PRIVATE_KEY=$(python $APPDIR/grid/bootstrap.py --private_key)
export SERVER_UID=$(python $APPDIR/grid/bootstrap.py --uid)

if [[ ${DEV_MODE} == "True" ]];
then
echo "Hot-reload Enabled"
Expand All @@ -28,20 +31,31 @@ fi

if [[ ${TRACING} == "true" ]];
then
# TODOs:
# ? Kubernetes OTel operator is recommended by signoz
export OTEL_PYTHON_LOG_CORRELATION=${OTEL_PYTHON_LOG_CORRELATION:-true}
echo "OpenTelemetry Enabled. Endpoint=$OTEL_EXPORTER_OTLP_ENDPOINT Protocol=$OTEL_EXPORTER_OTLP_PROTOCOL"
# TODO: Polish these values up
DEPLOYMENT_ENV="$SERVER_TYPE-$SERVER_SIDE_TYPE"
RESOURCE_ATTRS=(
"deployment.environment=$DEPLOYMENT_ENV"
"service.namespace=$DEPLOYMENT_ENV"
"service.instance.id=$SERVER_UID"
"k8s.pod.name=${K8S_POD_NAME:-"none"}"
"k8s.namespace.name=${K8S_NAMESPACE:"none"}"
"syft.server.uid=$SERVER_UID"
"syft.server.type=$SERVER_TYPE"
"syft.server.side.type=$SERVER_SIDE_TYPE"
)

# environ is always prefixed with the server type
export OTEL_SERVICE_NAME="${DEPLOYMENT_ENV}-${OTEL_SERVICE_NAME:-"backend"}"
export OTEL_RESOURCE_ATTRIBUTES=$(IFS=, ; echo "${RESOURCE_ATTRS[*]}")

echo "OpenTelemetry Enabled"
env | grep OTEL_
else
echo "OpenTelemetry Disabled"
fi

export CREDENTIALS_PATH=${CREDENTIALS_PATH:-$HOME/data/creds/credentials.json}
export SERVER_PRIVATE_KEY=$(python $APPDIR/grid/bootstrap.py --private_key)
export SERVER_UID=$(python $APPDIR/grid/bootstrap.py --uid)
export SERVER_TYPE=$SERVER_TYPE

echo "SERVER_UID=$SERVER_UID"
echo "SERVER_TYPE=$SERVER_TYPE"
echo "SERVER_SIDE_TYPE=$SERVER_SIDE_TYPE"

exec $ROOT_PROC uvicorn $RELOAD --host $HOST --port $PORT --log-config=$APPDIR/grid/logging.yaml "$APP_MODULE"
2 changes: 1 addition & 1 deletion packages/grid/helm/syft/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ postgres:
# PVC storage size
storageSize: 5Gi

# Mongo secret name. Override this if you want to use a self-managed secret.
# Postgres secret name. Override this if you want to use a self-managed secret.
secretKeyName: postgres-secret

# default/custom secret raw values
Expand Down
2 changes: 1 addition & 1 deletion packages/syft/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,9 @@ telemetry =
opentelemetry-instrumentation-botocore==0.48b0
opentelemetry-instrumentation-logging==0.48b0
opentelemetry-instrumentation-sqlalchemy==0.48b0
opentelemetry-instrumentation-threading==0.48b0
; opentelemetry-instrumentation-asyncio==0.48b0
; opentelemetry-instrumentation-sqlite3==0.48b0
; opentelemetry-instrumentation-threading==0.48b0
; opentelemetry-instrumentation-jinja2==0.48b0
; opentelemetry-instrumentation-system-metrics==0.48b0

Expand Down
3 changes: 3 additions & 0 deletions packages/syft/src/syft/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
from .util.commit import __commit__
from .util.patch_ipython import patch_ipython
from .util.telemetry import instrument
from .util.telemetry import instrument_threads
from .util.util import autocache
from .util.util import get_root_data_path
from .util.version_compare import make_requires
Expand All @@ -105,6 +106,8 @@
sys.path.append(str(Path(__file__)))


instrument_threads()

patch_ipython()


Expand Down
11 changes: 2 additions & 9 deletions packages/syft/src/syft/server/uvicorn.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from ..store.db.db import DBConfig
from ..util.autoreload import enable_autoreload
from ..util.constants import DEFAULT_TIMEOUT
from ..util.telemetry import TRACING_ENABLED
from ..util.telemetry import instrument_fastapi
from ..util.util import os_name
from .datasite import Datasite
from .enclave import Enclave
Expand Down Expand Up @@ -126,14 +126,7 @@ def app_factory() -> FastAPI:
allow_methods=["*"],
allow_headers=["*"],
)

if TRACING_ENABLED:
# third party
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor

FastAPIInstrumentor().instrument_app(app)
print("> Added OTEL FastAPIInstrumentor")

instrument_fastapi(app)
return app


Expand Down
2 changes: 1 addition & 1 deletion packages/syft/src/syft/service/code/user_code.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@
from .utils import parse_code
from .utils import submit_subjobs_code

logger = logging.getLogger(name=__name__)
logger = logging.getLogger(__name__)

if TYPE_CHECKING:
# relative
Expand Down
Loading

0 comments on commit ce97a4a

Please sign in to comment.