Skip to content
Open
2 changes: 2 additions & 0 deletions src/core/containers/runtime/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@
"""Container runtime providers."""

from .providers import ContainerProvider, KubernetesProvider, LocalDockerProvider
from .uv_provider import UVProvider

__all__ = [
"ContainerProvider",
"LocalDockerProvider",
"KubernetesProvider",
"UVProvider",
]
79 changes: 75 additions & 4 deletions src/core/containers/runtime/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,11 @@ def __init__(self):
capture_output=True,
timeout=5,
)
except (subprocess.CalledProcessError, FileNotFoundError, subprocess.TimeoutExpired):
except (
subprocess.CalledProcessError,
FileNotFoundError,
subprocess.TimeoutExpired,
):
raise RuntimeError(
"Docker is not available. Please install Docker Desktop or Docker Engine."
)
Expand Down Expand Up @@ -154,10 +158,13 @@ def start_container(

# Build docker run command
cmd = [
"docker", "run",
"docker",
"run",
"-d", # Detached
"--name", self._container_name,
"-p", f"{port}:8000", # Map port
"--name",
self._container_name,
"-p",
f"{port}:8000", # Map port
]

# Add environment variables
Expand Down Expand Up @@ -290,4 +297,68 @@ class KubernetesProvider(ContainerProvider):
>>> # Pod running in k8s, accessible via service or port-forward
>>> provider.stop_container()
"""

pass


class RuntimeProvider(ABC):
"""
Abstract base class for runtime providers that are not container providers.
Providers implement this interface to support different runtime platforms:
- UVProvider: Runs environments via `uv run`

The provider manages a single runtime lifecycle and provides the base URL
for connecting to it.

Example:
>>> provider = UVProvider()
>>> base_url = provider.start_container("echo-env:latest")
>>> print(base_url) # http://localhost:8000
>>> # Use the environment via base_url
>>> provider.stop_container()
"""

@abstractmethod
def start(
self,
port: Optional[int] = None,
env_vars: Optional[Dict[str, str]] = None,
**kwargs: Any,
) -> str:
"""
Start a runtime from the specified image.

Args:
image: Runtime image name
port: Port to expose (if None, provider chooses)
env_vars: Environment variables for the runtime
**kwargs: Additional runtime options
"""

@abstractmethod
def stop(self) -> None:
"""
Stop the runtime.
"""
pass

@abstractmethod
def wait_for_ready(self, timeout_s: float = 30.0) -> None:
"""
Wait for the runtime to be ready to accept requests.
"""
pass

def __enter__(self) -> "RuntimeProvider":
"""
Enter the runtime provider.
"""
self.start()
return self

def __exit__(self, exc_type, exc, tb) -> None:
"""
Exit the runtime provider.
"""
self.stop()
return False
203 changes: 203 additions & 0 deletions src/core/containers/runtime/uv_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
"""Providers for launching Hugging Face Spaces via ``uv run``."""

from __future__ import annotations

import os
import socket
import subprocess
import time
from typing import Dict, Optional

import requests

from .providers import RuntimeProvider


def _check_uv_installed() -> None:
try:
subprocess.check_output(["uv", "--version"])
except FileNotFoundError as exc:
raise RuntimeError(
"`uv` executable not found. Install uv from https://docs.astral.sh and ensure it is on PATH."
) from exc


def _find_free_port() -> int:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.bind(("", 0))
sock.listen(1)
return sock.getsockname()[1]


def _create_uv_command(
repo_id: str,
port: int,
reload: bool,
) -> list[str]:
command = [
"uv",
"run",
"--isolated",
"--with",
f"git+https://huggingface.co/spaces/{repo_id}",
"--",
"server",
"--host",
"0.0.0.0",
"--port",
str(port),
]
if reload:
command.append("--reload")
return command


def _poll_health(health_url: str, timeout_s: float) -> None:
"""Poll a health endpoint until it returns HTTP 200 or times out."""

deadline = time.time() + timeout_s
while time.time() < deadline:
try:
timeout = max(0.0001, min(deadline - time.time(), 2.0))
response = requests.get(health_url, timeout=timeout)
if response.status_code == 200:
return
except requests.RequestException:
continue

time.sleep(0.5)

raise TimeoutError(f"Server did not become ready within {timeout_s:.1f} seconds")


class UVProvider(RuntimeProvider):
"""
RuntimeProvider implementation backed by ``uv run``.

Args:
repo_id: The repository ID of the environment to run
reload: Whether to reload the environment on code changes
env_vars: Environment variables to pass to the environment
context_timeout_s: The timeout to wait for the environment to become ready

Example:
>>> provider = UVProvider(repo_id="burtenshaw/echo-cli")
>>> base_url = provider.start()
>>> print(base_url) # http://localhost:8000
>>> # Use the environment via base_url
>>> provider.stop()
"""

def __init__(
self,
repo_id: str,
reload: bool = False,
env_vars: Optional[Dict[str, str]] = None,
context_timeout_s: float = 60.0,
):
"""Initialize the UVProvider."""
self.repo_id = repo_id
self.reload = reload
self.env_vars = env_vars
self.context_timeout_s = context_timeout_s
_check_uv_installed()
self._process = None
self._base_url = None

def start(
self,
port: Optional[int] = None,
env_vars: Optional[Dict[str, str]] = None,
**_: Dict[str, str],
) -> str:
"""
Start the environment via `uv run`.

Args:
port: The port to bind the environment to
env_vars: Environment variables to pass to the environment

Returns:
The base URL of the environment

Raises:
RuntimeError: If the environment is already running
"""
if self._process is not None and self._process.poll() is None:
raise RuntimeError("UVProvider is already running")

bind_port = port or _find_free_port()

command = _create_uv_command(
repo_id=self.repo_id,
port=bind_port,
reload=self.reload,
)

env = os.environ.copy()

if self.env_vars:
env.update(self.env_vars)
if env_vars:
env.update(env_vars)

try:
self._process = subprocess.Popen(command, env=env)
except OSError as exc:
raise RuntimeError(f"Failed to launch `uv run`: {exc}") from exc

self._base_url = f"http://localhost:{bind_port}"
return self._base_url

def wait_for_ready(self, timeout_s: float = 60.0) -> None:
"""
Wait for the environment to become ready.

Args:
timeout_s: The timeout to wait for the environment to become ready

Raises:
RuntimeError: If the environment is not running
TimeoutError: If the environment does not become ready within the timeout
"""
if self._process and self._process.poll() is not None:
code = self._process.returncode
raise RuntimeError(f"uv process exited prematurely with code {code}")

_poll_health(f"{self._base_url}/health", timeout_s=timeout_s)

def stop(self) -> None:
"""
Stop the environment.

Raises:
RuntimeError: If the environment is not running
"""
if self._process is None:
return

if self._process.poll() is None:
self._process.terminate()
try:
self._process.wait(timeout=10.0)
except subprocess.TimeoutExpired:
self._process.kill()
self._process.wait(timeout=5.0)

self._process = None
self._base_url = None

@property
def base_url(self) -> str:
"""
The base URL of the environment.

Returns:
The base URL of the environment

Raises:
RuntimeError: If the environment is not running
"""
if self._base_url is None:
raise RuntimeError("UVProvider has not been started")
return self._base_url
45 changes: 29 additions & 16 deletions src/core/http_env_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
import requests

from .client_types import StepResult
from .containers.runtime import LocalDockerProvider
from .containers.runtime import LocalDockerProvider, UVProvider

if TYPE_CHECKING:
from .containers.runtime import ContainerProvider
from .containers.runtime import ContainerProvider, RuntimeProvider

ActT = TypeVar("ActT")
ObsT = TypeVar("ObsT")
Expand Down Expand Up @@ -106,22 +106,35 @@ def from_docker_image(
return cls(base_url=base_url, provider=provider)

@classmethod
def from_hub(cls: Type[EnvClientT], repo_id: str, provider: Optional["ContainerProvider"] = None, **kwargs: Any) -> EnvClientT:
"""
Create an environment client by pulling from a Hugging Face model hub.
def from_hub(
cls: Type[EnvClientT],
repo_id: str,
*,
use_docker: bool = True,
provider: Optional["ContainerProvider" | "RuntimeProvider"] = None,
**provider_kwargs: Any,
) -> EnvClientT:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find this signature very hard to understand without context since it's mixing kwargs for docker and for uv. Also I don't think one needs use_docker, provider, and runner.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My suggestion would be to remove provider and runner. Also remove project_url and connect_host. Have a single env (instead of env for docker and extra_env for uv). And use some typed dict + typing.overload so that IDEs can have correct autocompletion. Here is a simplified example:

from typing import Any, Dict, NotRequired, TypedDict, Unpack, overload


class DockerKwargs(TypedDict, total=False):
    tag: NotRequired[str]
    env: NotRequired[Dict[str, str]]


class UVProvider(TypedDict):
    host: NotRequired[str]
    port: NotRequired[int]
    reload: NotRequired[bool]
    timeout_s: NotRequired[float]
    env: NotRequired[Dict[str, str]]


@overload
def from_hub(repo_id: str, *, use_docker: bool = True, **kwargs: Unpack[DockerKwargs]) -> str: ...


@overload
def from_hub(repo_id: str, *, use_docker: bool = False, **kwargs: Unpack[UVProvider]) -> str: ...


def from_hub(repo_id: str, *, use_docker: bool = False, **kwargs: Any) -> str:
    raise NotImplementedError()
Image Image

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that I'm not a fan of overloads and typed dict but it's the only solution I see to correctly document the signature while keeping a single method.

Another solution is to have from_hub_docker and from_hub_uv (more explicit but less elegant)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this. I've simplified the signatures right down, but I haven't added type overloading in this PR.

"""Create a client from a Hugging Face Space.

Set ``use_docker=True`` to launch the registry image with a container
provider. The default ``use_docker=False`` runs the Space locally using
``uv run`` through :class:`UVProvider`.
"""

if provider is None:
provider = LocalDockerProvider()

if "tag" in kwargs:
tag = kwargs["tag"]

if use_docker:
tag = provider_kwargs.pop("tag", "latest")
image = f"registry.hf.space/{repo_id.replace('/', '-')}:{tag}"
return cls.from_docker_image(image, provider=provider, **provider_kwargs)
else:
tag = "latest"

base_url = f"registry.hf.space/{repo_id.replace('/', '-')}:{tag}"

return cls.from_docker_image(image=base_url, provider=provider)
provider: RuntimeProvider = UVProvider(
repo_id=repo_id,
**provider_kwargs,
)
base_url = provider.start()
timeout_s = provider_kwargs.pop("timeout_s", 60.0)
provider.wait_for_ready(base_url=provider.base_url, timeout_s=timeout_s)

return cls(base_url=base_url, provider=provider)

@abstractmethod
def _step_payload(self, action: ActT) -> dict:
Expand Down