Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions python/ray/serve/_private/application_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,13 @@ def apply_deployment_info(

if deployment_info.route_prefix is not None:
config = deployment_info.deployment_config
# Try to get route_patterns from deployment state first (most up-to-date),
# otherwise fall back to existing endpoint patterns
route_patterns = (
self._deployment_state_manager.get_deployment_route_patterns(
deployment_id
)
)
self._endpoint_state.update_endpoint(
deployment_id,
# The current meaning of the "is_cross_language" field is ambiguous.
Expand All @@ -550,6 +557,7 @@ def apply_deployment_info(
route=deployment_info.route_prefix,
app_is_cross_language=config.deployment_language
!= DeploymentLanguage.PYTHON,
route_patterns=route_patterns,
),
)
else:
Expand Down
32 changes: 32 additions & 0 deletions python/ray/serve/_private/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,40 @@ def __str__(self) -> str:

@dataclass
class EndpointInfo:
"""Metadata about a deployment's HTTP/gRPC endpoint.

This represents the public routing interface for a deployment. It's created when
a deployment is registered with a route prefix and broadcast to all proxies via
the long poll mechanism (ROUTE_TABLE namespace).

Flow:
1. Created in ApplicationState when deployment is applied
2. Stored in EndpointState (controller's source of truth)
3. Broadcast to all ProxyActors via long poll (ROUTE_TABLE)
4. Cached in ProxyRouter for request routing
5. Used to route incoming HTTP/gRPC requests to correct deployments
6. Used to determine route patterns for accurate metrics tagging

Key Difference from DeploymentInfo:
- EndpointInfo: Just HTTP/gRPC routing metadata (shared with proxies)
- DeploymentInfo: Complete deployment config (replicas, resources, etc.)

Attributes:
route: The route prefix for this deployment (e.g., "/api").
app_is_cross_language: Whether the deployment uses a different language
than the proxy (e.g., Java deployment with Python proxy). This affects
how the proxy serializes/deserializes requests.
route_patterns: List of all ASGI route patterns for this deployment
(e.g., ["/", "/users/{user_id}", "/items/{item_id}/details"]).
Used by proxies to match incoming requests to specific route patterns
for accurate metrics tagging. This avoids high cardinality by using
parameterized patterns instead of individual request paths.
Only populated for deployments with ASGI apps (FastAPI/Starlette).
"""

route: str
app_is_cross_language: bool = False
route_patterns: Optional[List[str]] = None


# Keep in sync with ServeReplicaState in dashboard/client/src/type/serve.ts
Expand Down
26 changes: 25 additions & 1 deletion python/ray/serve/_private/deployment_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ def __init__(
self._initialization_latency_s: Optional[float] = None
self._internal_grpc_port: Optional[int] = None
self._docs_path: Optional[str] = None
self._route_patterns: Optional[List[str]] = None
# Rank assigned to the replica.
self._rank: Optional[int] = None
# Populated in `on_scheduled` or `recover`.
Expand Down Expand Up @@ -348,6 +349,10 @@ def deployment_config(self) -> DeploymentConfig:
def docs_path(self) -> Optional[str]:
return self._docs_path

@property
def route_patterns(self) -> Optional[List[str]]:
return self._route_patterns

@property
def max_ongoing_requests(self) -> int:
return self.deployment_config.max_ongoing_requests
Expand Down Expand Up @@ -768,6 +773,7 @@ def check_ready(self) -> Tuple[ReplicaStartupStatus, Optional[str]]:
self._http_port,
self._grpc_port,
self._rank,
self._route_patterns,
) = ray.get(self._ready_obj_ref)
except RayTaskError as e:
logger.exception(
Expand Down Expand Up @@ -1126,6 +1132,10 @@ def version(self):
def docs_path(self) -> Optional[str]:
return self._actor.docs_path

@property
def route_patterns(self) -> Optional[List[str]]:
return self._actor.route_patterns

@property
def actor_id(self) -> str:
return self._actor.actor_id
Expand Down Expand Up @@ -1773,6 +1783,7 @@ def __init__(
self._last_broadcasted_deployment_config = None

self._docs_path: Optional[str] = None
self._route_patterns: Optional[List[str]] = None

def should_autoscale(self) -> bool:
"""
Expand Down Expand Up @@ -1865,6 +1876,10 @@ def app_name(self) -> str:
def docs_path(self) -> Optional[str]:
return self._docs_path

@property
def route_patterns(self) -> Optional[List[str]]:
return self._route_patterns

@property
def _failed_to_start_threshold(self) -> int:
# Use global override if set, otherwise use deployment config
Expand Down Expand Up @@ -2538,9 +2553,10 @@ def _check_startup_replicas(
)

# if replica version is the same as the target version,
# we update the docs path
# we update the docs path and route patterns
if replica.version == self._target_state.version:
self._docs_path = replica.docs_path
self._route_patterns = replica.route_patterns

# Log the startup latency.
e2e_replica_start_latency = time.time() - replica._start_time
Expand Down Expand Up @@ -3191,6 +3207,14 @@ def get_deployment_docs_path(self, deployment_id: DeploymentID) -> Optional[str]
if deployment_id in self._deployment_states:
return self._deployment_states[deployment_id].docs_path

def get_deployment_route_patterns(
self, deployment_id: DeploymentID
) -> Optional[List[str]]:
"""Get route patterns for a deployment if available."""
if deployment_id in self._deployment_states:
return self._deployment_states[deployment_id].route_patterns
return None

def get_deployment_target_num_replicas(
self, deployment_id: DeploymentID
) -> Optional[int]:
Expand Down
19 changes: 11 additions & 8 deletions python/ray/serve/_private/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,14 +375,17 @@ def _get_response_handler_info(
if version.parse(starlette.__version__) < version.parse("0.33.0"):
proxy_request.set_path(route_path.replace(route_prefix, "", 1))

# NOTE(edoakes): we use the route_prefix instead of the full HTTP path
# for logs & metrics to avoid high cardinality.
# See: https://github.com/ray-project/ray/issues/47999
logs_and_metrics_route = (
route_prefix
if self.protocol == RequestProtocol.HTTP
else handle.deployment_id.app_name
)
# NOTE(abrar): we try to match to a specific route pattern (e.g., /api/{user_id})
# for logs & metrics when available. If no pattern matches, we fall back to the
# route_prefix to avoid high cardinality.
# See: https://github.com/ray-project/ray/issues/47999 and
# https://github.com/ray-project/ray/issues/52212
if self.protocol == RequestProtocol.HTTP:
logs_and_metrics_route = self.proxy_router.match_route_pattern(
route_prefix, proxy_request.scope
)
else:
logs_and_metrics_route = handle.deployment_id.app_name
internal_request_id = generate_request_id()
handle, request_id = self.setup_request_context_and_handle(
app_name=handle.deployment_id.app_name,
Expand Down
82 changes: 81 additions & 1 deletion python/ray/serve/_private/proxy_router.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
import logging
from typing import Callable, Dict, List, Optional, Tuple
from typing import Any, Callable, Dict, List, Optional, Tuple

from starlette.applications import Starlette
from starlette.requests import Request
from starlette.routing import Route
from starlette.types import Scope

from ray.serve._private.common import ApplicationName, DeploymentID, EndpointInfo
from ray.serve._private.constants import SERVE_LOGGER_NAME
from ray.serve._private.thirdparty.get_asgi_route_name import get_asgi_route_name
from ray.serve.handle import DeploymentHandle

logger = logging.getLogger(SERVE_LOGGER_NAME)
Expand Down Expand Up @@ -38,6 +44,13 @@ def __init__(
# Endpoints info associated with endpoints.
self.endpoints: Dict[DeploymentID, EndpointInfo] = dict()

# Map of route prefix to list of route patterns for that endpoint
# Used to match incoming requests to ASGI route patterns for metrics
self.route_patterns: Dict[str, List[str]] = dict()
# Cache of mock Starlette apps for route pattern matching
# Key: route prefix, Value: pre-built Starlette app with routes
self._route_pattern_apps: Dict[str, Any] = dict()

def ready_for_traffic(self, is_head: bool) -> Tuple[bool, str]:
"""Whether the proxy router is ready to serve traffic.

Expand Down Expand Up @@ -80,10 +93,13 @@ def update_routes(self, endpoints: Dict[DeploymentID, EndpointInfo]):
routes = []
route_info = {}
app_to_is_cross_language = {}
route_patterns = {}
for endpoint, info in endpoints.items():
routes.append(info.route)
route_info[info.route] = endpoint
app_to_is_cross_language[endpoint.app_name] = info.app_is_cross_language
if info.route_patterns:
route_patterns[info.route] = info.route_patterns
if endpoint in self.handles:
existing_handles.remove(endpoint)
else:
Expand All @@ -103,6 +119,9 @@ def update_routes(self, endpoints: Dict[DeploymentID, EndpointInfo]):
self.sorted_routes = sorted(routes, key=lambda x: len(x), reverse=True)
self.route_info = route_info
self.app_to_is_cross_language = app_to_is_cross_language
self.route_patterns = route_patterns
# Invalidate cached mock apps when route patterns change
self._route_pattern_apps.clear()

def match_route(
self, target_route: str
Expand Down Expand Up @@ -163,3 +182,64 @@ def get_handle_for_endpoint(
)

return None

def match_route_pattern(self, route_prefix: str, asgi_scope: Scope) -> str:
"""Match an incoming request to a specific route pattern.

This attempts to match the request path to a route pattern (e.g., /api/{user_id})
rather than just the route prefix. This provides more granular metrics.

The mock Starlette app is cached per route_prefix for performance, avoiding
the overhead of recreating the app and routes on every request.

Args:
route_prefix: The matched route prefix from match_route()
asgi_scope: The ASGI scope containing the request path and method

Returns:
The matched route pattern if available, otherwise the route_prefix
"""
# If we don't have route patterns for this prefix, return the prefix
if route_prefix not in self.route_patterns:
return route_prefix

patterns = self.route_patterns[route_prefix]
if not patterns:
return route_prefix

# Get or create the cached mock app for this route_prefix
mock_app = self._route_pattern_apps.get(route_prefix)
if mock_app is None:
try:
# Create routes from patterns
# We use a dummy endpoint since we only need pattern matching
async def dummy_endpoint(request: Request):
pass

routes = [Route(pattern, dummy_endpoint) for pattern in patterns]
mock_app = Starlette(routes=routes)

# Cache the mock app for future requests
self._route_pattern_apps[route_prefix] = mock_app
except Exception:
# If app creation fails, fall back to route prefix
logger.debug(
f"Failed to create mock app for route pattern matching: {route_prefix}",
exc_info=True,
)
return route_prefix

# Use the cached mock app to match the route pattern
try:
matched = get_asgi_route_name(mock_app, asgi_scope)
if matched:
return matched
except Exception:
# If matching fails for any reason, fall back to route prefix
logger.debug(
f"Failed to match route pattern for {route_prefix}",
exc_info=True,
)

# Fall back to route prefix if no pattern matched
return route_prefix
18 changes: 17 additions & 1 deletion python/ray/serve/_private/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
Callable,
Dict,
Generator,
List,
Optional,
Tuple,
Union,
Expand Down Expand Up @@ -93,7 +94,10 @@
)
from ray.serve._private.metrics_utils import InMemoryMetricsStore, MetricsPusher
from ray.serve._private.task_consumer import TaskConsumerWrapper
from ray.serve._private.thirdparty.get_asgi_route_name import get_asgi_route_name
from ray.serve._private.thirdparty.get_asgi_route_name import (
extract_route_patterns,
get_asgi_route_name,
)
from ray.serve._private.usage import ServeUsageTag
from ray.serve._private.utils import (
Semaphore,
Expand Down Expand Up @@ -121,6 +125,9 @@
Optional[int],
Optional[str],
int,
int,
int, # rank
Optional[List[str]], # route_patterns
]


Expand Down Expand Up @@ -573,6 +580,14 @@ def get_num_ongoing_requests(self) -> int:

def get_metadata(self) -> ReplicaMetadata:
current_rank = ray.serve.context._get_internal_replica_context().rank
# Extract route patterns from ASGI app if available
route_patterns = None
if self._user_callable_asgi_app is not None:
# _user_callable_asgi_app is the actual ASGI app (FastAPI/Starlette)
# It's set when initialize_callable() returns an ASGI app
if hasattr(self._user_callable_asgi_app, "routes"):
route_patterns = extract_route_patterns(self._user_callable_asgi_app)

return (
self._version.deployment_config,
self._version,
Expand All @@ -582,6 +597,7 @@ def get_metadata(self) -> ReplicaMetadata:
self._http_port,
self._grpc_port,
current_rank,
route_patterns,
)

def _set_internal_replica_context(
Expand Down
Loading