|
| 1 | +"""DNS rebinding protection for MCP server transports.""" |
| 2 | + |
| 3 | +import logging |
| 4 | + |
| 5 | +from pydantic import BaseModel, Field |
| 6 | +from starlette.requests import Request |
| 7 | +from starlette.responses import Response |
| 8 | + |
| 9 | +logger = logging.getLogger(__name__) |
| 10 | + |
| 11 | + |
| 12 | +class TransportSecuritySettings(BaseModel): |
| 13 | + """Settings for MCP transport security features. |
| 14 | +
|
| 15 | + These settings help protect against DNS rebinding attacks by validating |
| 16 | + incoming request headers. |
| 17 | + """ |
| 18 | + |
| 19 | + enable_dns_rebinding_protection: bool = Field( |
| 20 | + default=True, |
| 21 | + description="Enable DNS rebinding protection (recommended for production)", |
| 22 | + ) |
| 23 | + |
| 24 | + allowed_hosts: list[str] = Field( |
| 25 | + default=[], |
| 26 | + description="List of allowed Host header values. Only applies when " |
| 27 | + + "enable_dns_rebinding_protection is True.", |
| 28 | + ) |
| 29 | + |
| 30 | + allowed_origins: list[str] = Field( |
| 31 | + default=[], |
| 32 | + description="List of allowed Origin header values. Only applies when " |
| 33 | + + "enable_dns_rebinding_protection is True.", |
| 34 | + ) |
| 35 | + |
| 36 | + |
| 37 | +class TransportSecurityMiddleware: |
| 38 | + """Middleware to enforce DNS rebinding protection for MCP transport endpoints.""" |
| 39 | + |
| 40 | + def __init__(self, settings: TransportSecuritySettings | None = None): |
| 41 | + # If not specified, disable DNS rebinding protection by default |
| 42 | + # for backwards compatibility |
| 43 | + self.settings = settings or TransportSecuritySettings(enable_dns_rebinding_protection=False) |
| 44 | + |
| 45 | + def _validate_host(self, host: str | None) -> bool: |
| 46 | + """Validate the Host header against allowed values.""" |
| 47 | + if not host: |
| 48 | + logger.warning("Missing Host header in request") |
| 49 | + return False |
| 50 | + |
| 51 | + # Check exact match first |
| 52 | + if host in self.settings.allowed_hosts: |
| 53 | + return True |
| 54 | + |
| 55 | + # Check wildcard port patterns |
| 56 | + for allowed in self.settings.allowed_hosts: |
| 57 | + if allowed.endswith(":*"): |
| 58 | + # Extract base host from pattern |
| 59 | + base_host = allowed[:-2] |
| 60 | + # Check if the actual host starts with base host and has a port |
| 61 | + if host.startswith(base_host + ":"): |
| 62 | + return True |
| 63 | + |
| 64 | + logger.warning(f"Invalid Host header: {host}") |
| 65 | + return False |
| 66 | + |
| 67 | + def _validate_origin(self, origin: str | None) -> bool: |
| 68 | + """Validate the Origin header against allowed values.""" |
| 69 | + # Origin can be absent for same-origin requests |
| 70 | + if not origin: |
| 71 | + return True |
| 72 | + |
| 73 | + # Check exact match first |
| 74 | + if origin in self.settings.allowed_origins: |
| 75 | + return True |
| 76 | + |
| 77 | + # Check wildcard port patterns |
| 78 | + for allowed in self.settings.allowed_origins: |
| 79 | + if allowed.endswith(":*"): |
| 80 | + # Extract base origin from pattern |
| 81 | + base_origin = allowed[:-2] |
| 82 | + # Check if the actual origin starts with base origin and has a port |
| 83 | + if origin.startswith(base_origin + ":"): |
| 84 | + return True |
| 85 | + |
| 86 | + logger.warning(f"Invalid Origin header: {origin}") |
| 87 | + return False |
| 88 | + |
| 89 | + def _validate_content_type(self, content_type: str | None) -> bool: |
| 90 | + """Validate the Content-Type header for POST requests.""" |
| 91 | + if not content_type: |
| 92 | + logger.warning("Missing Content-Type header in POST request") |
| 93 | + return False |
| 94 | + |
| 95 | + # Content-Type must start with application/json |
| 96 | + if not content_type.lower().startswith("application/json"): |
| 97 | + logger.warning(f"Invalid Content-Type header: {content_type}") |
| 98 | + return False |
| 99 | + |
| 100 | + return True |
| 101 | + |
| 102 | + async def validate_request(self, request: Request, is_post: bool = False) -> Response | None: |
| 103 | + """Validate request headers for DNS rebinding protection. |
| 104 | +
|
| 105 | + Returns None if validation passes, or an error Response if validation fails. |
| 106 | + """ |
| 107 | + # Always validate Content-Type for POST requests |
| 108 | + if is_post: |
| 109 | + content_type = request.headers.get("content-type") |
| 110 | + if not self._validate_content_type(content_type): |
| 111 | + return Response("Invalid Content-Type header", status_code=400) |
| 112 | + |
| 113 | + # Skip remaining validation if DNS rebinding protection is disabled |
| 114 | + if not self.settings.enable_dns_rebinding_protection: |
| 115 | + return None |
| 116 | + |
| 117 | + # Validate Host header |
| 118 | + host = request.headers.get("host") |
| 119 | + if not self._validate_host(host): |
| 120 | + return Response("Invalid Host header", status_code=421) |
| 121 | + |
| 122 | + # Validate Origin header |
| 123 | + origin = request.headers.get("origin") |
| 124 | + if not self._validate_origin(origin): |
| 125 | + return Response("Invalid Origin header", status_code=400) |
| 126 | + |
| 127 | + return None |
0 commit comments