diff --git a/pyproject.toml b/pyproject.toml index 3dd4f175..ccb7dcaa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,7 +9,7 @@ version = "0.1.6" license = { file = "LICENSE" } dependencies = [ "asyncio", - "pytrickle @ git+https://github.com/livepeer/pytrickle.git@v0.1.4", + "pytrickle @ git+https://github.com/livepeer/pytrickle.git@3f0fda21e2f2055e8f903a0430cfe8e782c0a28d", "comfyui @ git+https://github.com/hiddenswitch/ComfyUI.git@e62df3a8811d8c652a195d4669f4fb27f6c9a9ba", "aiortc", "aiohttp", diff --git a/requirements.txt b/requirements.txt index b35bb52c..0ed94ddb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ asyncio -pytrickle @ git+https://github.com/livepeer/pytrickle.git@v0.1.4 +pytrickle @ git+https://github.com/livepeer/pytrickle.git@3f0fda21e2f2055e8f903a0430cfe8e782c0a28d comfyui @ git+https://github.com/hiddenswitch/ComfyUI.git@e62df3a8811d8c652a195d4669f4fb27f6c9a9ba aiortc aiohttp diff --git a/server/byoc.py b/server/byoc.py index 879ac287..9451808c 100644 --- a/server/byoc.py +++ b/server/byoc.py @@ -76,7 +76,7 @@ def main(): if args.comfyui_log_level: log_level = logging._nameToLevel.get(args.comfyui_log_level.upper()) logging.getLogger("comfy").setLevel(log_level) - + # Add ComfyStream timeout filter to suppress verbose execution logging logging.getLogger("comfy.cmd.execution").addFilter(ComfyStreamTimeoutFilter()) @@ -85,7 +85,7 @@ def force_print(*args, **kwargs): sys.stdout.flush() logger.info("Starting ComfyStream BYOC server with pytrickle StreamProcessor...") - + # Create frame processor with configuration frame_processor = ComfyStreamFrameProcessor( width=args.width, @@ -98,7 +98,7 @@ def force_print(*args, **kwargs): logging_level=args.comfyui_log_level, comfyui_inference_log_level=args.comfyui_inference_log_level ) - + # Create frame skip configuration only if enabled frame_skip_config = None if args.disable_frame_skip: @@ -106,14 +106,16 @@ def force_print(*args, **kwargs): else: frame_skip_config = FrameSkipConfig() logger.info("Frame skipping enabled: adaptive skipping based on queue sizes") - + # Create StreamProcessor with frame processor processor = StreamProcessor( video_processor=frame_processor.process_video_async, audio_processor=frame_processor.process_audio_async, model_loader=frame_processor.load_model, param_updater=frame_processor.update_params, + on_stream_start=frame_processor.on_stream_start, on_stream_stop=frame_processor.on_stream_stop, + warmup_handler=frame_processor.warmup, # Align processor name with capability for consistent logs name=(os.getenv("CAPABILITY_NAME") or "comfystream"), port=int(args.port), @@ -127,11 +129,7 @@ def force_print(*args, **kwargs): # Set the stream processor reference for text data publishing frame_processor.set_stream_processor(processor) - - # Create async startup function to load model - async def load_model_on_startup(app): - await processor._frame_processor.load_model() - + # Create async startup function for orchestrator registration async def register_orchestrator_startup(app): try: @@ -163,8 +161,7 @@ async def register_orchestrator_startup(app): # Clear ORCH_SECRET from environment even on error os.environ.pop("ORCH_SECRET", None) - # Add model loading and registration to startup hooks - processor.server.app.on_startup.append(load_model_on_startup) + # Add registration to startup hooks processor.server.app.on_startup.append(register_orchestrator_startup) # Add warmup endpoint: accepts same body as prompts update @@ -189,7 +186,7 @@ async def warmup_handler(request): # Mount at same API namespace as StreamProcessor defaults processor.server.add_route("POST", "/api/stream/warmup", warmup_handler) - + # Run the processor processor.run() diff --git a/server/frame_processor.py b/server/frame_processor.py index 8051105a..0f72fbdc 100644 --- a/server/frame_processor.py +++ b/server/frame_processor.py @@ -2,13 +2,14 @@ import json import logging import os -from typing import List +from typing import List, Optional -import numpy as np from pytrickle.frame_processor import FrameProcessor from pytrickle.frames import VideoFrame, AudioFrame +from pytrickle.utils.loading_overlay import build_loading_overlay_frame +from pytrickle.warmup_config import WarmupMode from comfystream.pipeline import Pipeline -from comfystream.utils import convert_prompt, ComfyStreamParamsUpdateRequest +from comfystream.utils import convert_prompt, ComfyStreamParamsUpdateRequest, get_default_workflow logger = logging.getLogger(__name__) @@ -16,32 +17,47 @@ class ComfyStreamFrameProcessor(FrameProcessor): """ Integrated ComfyStream FrameProcessor for pytrickle. - + This class wraps the ComfyStream Pipeline to work with pytrickle's streaming architecture. """ def __init__(self, text_poll_interval: float = 0.25, **load_params): """Initialize with load parameters for pipeline creation. - + Args: text_poll_interval: Interval in seconds to poll for text outputs (default: 0.25) **load_params: Parameters for pipeline creation """ + # Import here to avoid circular dependency + from pytrickle.warmup_config import WarmupConfig, WarmupMode + + # Initialize base class with warmup config + warmup_config = WarmupConfig( + mode=WarmupMode.OVERLAY, + message="Loading...", + progress=None, + enabled=True + ) + super().__init__(warmup_config=warmup_config) + + # ComfyStream-specific attributes self.pipeline = None self._load_params = load_params self._text_poll_interval = text_poll_interval self._stream_processor = None - self._warmup_task = None self._text_forward_task = None self._background_tasks = [] self._stop_event = asyncio.Event() - super().__init__() + self._runner_active = False + + # Custom comfystream warmup passthrough toggle + self._warmup_passthrough_enabled: bool = False def set_stream_processor(self, stream_processor): """Set reference to StreamProcessor for data publishing.""" self._stream_processor = stream_processor logger.info("StreamProcessor reference set for text data publishing") - + def _setup_text_monitoring(self): """Set up background text forwarding from the pipeline.""" try: @@ -105,7 +121,7 @@ async def _stop_text_forwarder(self) -> None: except Exception: logger.debug("Error while awaiting text forwarder cancellation", exc_info=True) self._text_forward_task = None - + async def on_stream_stop(self): """Called when stream stops - cleanup background tasks.""" logger.info("Stream stopped, cleaning up background tasks") @@ -113,17 +129,32 @@ async def on_stream_stop(self): # Set stop event to signal all background tasks to stop self._stop_event.set() - # Stop the ComfyStream client's prompt execution + # Stop the ComfyStream client's prompt execution immediately to avoid no-input logs if self.pipeline and self.pipeline.client: logger.info("Stopping ComfyStream client prompt execution") try: - await self.pipeline.client.cleanup() + await self.pipeline.client.stop_prompts_immediately() except Exception as e: logger.error(f"Error stopping ComfyStream client: {e}") + self._runner_active = False # Stop text forwarder await self._stop_text_forwarder() + # Cancel warmup if running and properly await it + try: + if self._warmup_task and not self._warmup_task.done(): + self._warmup_task.cancel() + try: + await self._warmup_task + logger.debug("Warmup task cancelled successfully") + except asyncio.CancelledError: + logger.debug("Warmup task cancelled") + except Exception: + logger.debug("Warmup task cancellation error", exc_info=True) + except Exception: + logger.debug("Error during warmup task cancellation", exc_info=True) + # Cancel any other background tasks started by this processor for task in list(self._background_tasks): try: @@ -144,77 +175,227 @@ async def on_stream_stop(self): self._background_tasks.clear() logger.info("All background tasks cleaned up") - + def _reset_stop_event(self): """Reset the stop event for a new stream.""" self._stop_event.clear() + + async def _ensure_runner_active(self) -> None: + """ + Ensure the prompt runner is active when real frames arrive. + + This is called on the first real input frame after warmup completes. + The pipeline was paused after warmup, so we resume it here. + """ + if not self.pipeline or not getattr(self.pipeline, "client", None): + return + if not self._runner_active: + await self.pipeline.client.ensure_prompt_tasks_running() + self.pipeline.client.resume() + self._runner_active = True + + def _build_loading_overlay_frame(self, frame: VideoFrame) -> VideoFrame: + """ + Render a loading overlay frame while warmup is in progress. + + Uses pytrickle's build_loading_overlay_frame to create an animated overlay + that preserves timing information from the original frame. + """ + try: + self._frame_counter += 1 + + # Use pytrickle's helper with base class config + overlay_frame = build_loading_overlay_frame( + original_frame=frame, + message=self.warmup_config.message if self.warmup_config else "Loading...", + frame_counter=self._frame_counter, + progress=self.warmup_config.progress if self.warmup_config else None + ) + + # Preserve application-specific side_data + overlay_frame.side_data = frame.side_data + return overlay_frame + except Exception: + logger.debug("Failed to generate loading overlay frame", exc_info=True) + return frame + async def load_model(self, **kwargs): - """Load model and initialize the pipeline.""" + """ + Load model and initialize pipeline with default workflow only. + + This method ONLY initializes the pipeline - no warmup is performed here. + Warmup is handled separately by pytrickle's base class after load_model completes. + """ params = {**self._load_params, **kwargs} - - if self.pipeline is None: - self.pipeline = Pipeline( - width=int(params.get('width', 512)), - height=int(params.get('height', 512)), - cwd=params.get('workspace', os.getcwd()), - disable_cuda_malloc=params.get('disable_cuda_malloc', True), - gpu_only=params.get('gpu_only', True), - preview_method=params.get('preview_method', 'none'), - comfyui_inference_log_level=params.get('comfyui_inference_log_level', "INFO"), - logging_level=params.get('comfyui_inference_log_level', "INFO"), - blacklist_custom_nodes=["ComfyUI-Manager"], - ) - async def warmup(self): - """Warm up the pipeline.""" + # Initialize pipeline if needed + await self._initialize_pipeline(params) + + # Only set the default workflow if no prompts are currently configured + has_prompts = bool(getattr(self.pipeline.client, "current_prompts", None)) + + if not has_prompts: + default_workflow = get_default_workflow() + # Process prompts but skip warmup - warmup will be called separately by pytrickle + await self._process_prompts(default_workflow, skip_warmup=True) + + logger.debug("load_model completed - pipeline initialized with default workflow") + + async def warmup(self, **kwargs): + """ + Warm up the pipeline by sending frames through it. + + This method manages the pipeline pause/resume lifecycle: + 1. Process prompts if provided in kwargs + 2. Resume the pipeline to process warm frames + 3. Warm up video/audio as needed + 4. Pause the pipeline after warmup to save resources + 5. Pipeline will be resumed again on first real input frame + + Args: + **kwargs: Optional parameters, including: + - prompts: Workflow prompts to process before warmup + + The base class handles warmup state coordination and ensures the state + stays LOADING until this method completes. + """ if not self.pipeline: logger.warning("Warmup requested before pipeline initialization") return - + + # Process prompts if provided (e.g., from parameter update) + if "prompts" in kwargs: + logger.info("Processing prompts during warmup") + await self._process_prompts(kwargs["prompts"], skip_warmup=True) + logger.info("Running pipeline warmup...") try: + # Resume pipeline for warmup processing + await self.pipeline.client.ensure_prompt_tasks_running() + self.pipeline.client.resume() + capabilities = self.pipeline.get_workflow_io_capabilities() logger.info(f"Detected I/O capabilities: {capabilities}") - + if capabilities.get("video", {}).get("input") or capabilities.get("video", {}).get("output"): await self.pipeline.warm_video() - + if capabilities.get("audio", {}).get("input") or capabilities.get("audio", {}).get("output"): await self.pipeline.warm_audio() - + except Exception as e: logger.error(f"Warmup failed: {e}") + finally: + # Pause pipeline after warmup to save resources + # Will be resumed again on first real input frame + try: + self.pipeline.client.pause() + except Exception: + logger.debug("Failed to pause prompt loop after warmup", exc_info=True) + self._runner_active = False + logger.info("Pipeline warmup finished") - def _schedule_warmup(self) -> None: - """Schedule warmup in background if not already running.""" + async def on_stream_start(self): + """Called when a new stream starts - prepare for streaming.""" + logger.info("Stream started, setting up monitoring and scheduling warmup") try: - if self._warmup_task and not self._warmup_task.done(): - logger.info("Warmup already in progress, skipping new warmup request") - return + self._reset_stop_event() + + # Ensure pipeline is initialized (fast, no warmup) + await self._initialize_pipeline() + + # Start text forwarder best-effort + self._setup_text_monitoring() + + # Only start warmup if not already active + # (load_model may have already triggered it via parameter update) + if not self._is_warmup_active(): + logger.info("Starting warmup from on_stream_start") + self._start_warmup_sequence(self.warmup()) + else: + logger.info("Warmup already active, not restarting") + except Exception as e: + logger.error(f"on_stream_start failed: {e}") + + async def _initialize_pipeline(self, params: dict = None): + """ + Ensure pipeline is initialized with the given parameters. + + Args: + params: Optional parameters for pipeline creation. If None, uses self._load_params. + """ + if self.pipeline is not None: + logger.debug("Pipeline already exists") + return + + if params is None: + params = self._load_params + + logger.info("Initializing pipeline") + self.pipeline = Pipeline( + width=int(params.get('width', 512)), + height=int(params.get('height', 512)), + cwd=params.get('workspace', os.getcwd()), + disable_cuda_malloc=params.get('disable_cuda_malloc', True), + gpu_only=params.get('gpu_only', True), + preview_method=params.get('preview_method', 'none'), + comfyui_inference_log_level=params.get('comfyui_inference_log_level', "INFO"), + logging_level=params.get('comfyui_inference_log_level', "INFO"), + blacklist_custom_nodes=["ComfyUI-Manager"], + ) + + def _set_warmup_passthrough(self, enabled: bool) -> None: + """ + Enable/disable passthrough during warmup (video only). - self._warmup_task = asyncio.create_task(self.warmup()) - logger.info("Warmup scheduled in background") + Updates the warmup config mode between OVERLAY and PASSTHROUGH. + """ + try: + if self.warmup_config: + self.warmup_config.mode = WarmupMode.PASSTHROUGH if enabled else WarmupMode.OVERLAY + self._warmup_passthrough_enabled = bool(enabled) + logger.info( + "Warmup passthrough %s", + "enabled" if enabled else "disabled", + ) except Exception: - logger.warning("Failed to schedule warmup", exc_info=True) + logger.debug("Failed to set warmup passthrough flag", exc_info=True) async def process_video_async(self, frame: VideoFrame) -> VideoFrame: - """Process video frame through ComfyStream Pipeline.""" + """Process video frame through ComfyStream Pipeline or emit a loading overlay during warmup.""" try: - + if not self.pipeline: + return frame + + # Use base class helper to check warmup state + if self._is_warmup_active(): + # Check if we should show overlay or passthrough + if self._should_show_loading_overlay(): + if self._frame_counter % 30 == 1: # Log every ~1 second at 30fps + logger.debug(f"Warmup active: showing loading overlay (frame {self._frame_counter})") + return self._build_loading_overlay_frame(frame) + else: + if self._frame_counter % 30 == 1: + logger.debug(f"Warmup active: passthrough mode (frame {self._frame_counter})") + return frame + + # Log transition from warmup to normal processing + if self._warmup_done.is_set() and not self._runner_active: + logger.info("First frame after warmup complete - resuming runner for normal processing") + + await self._ensure_runner_active() + # Convert pytrickle VideoFrame to av.VideoFrame av_frame = frame.to_av_frame(frame.tensor) av_frame.pts = frame.timestamp av_frame.time_base = frame.time_base - - # Process through pipeline + await self.pipeline.put_video_frame(av_frame) processed_av_frame = await self.pipeline.get_processed_video_frame() - - # Convert back to pytrickle VideoFrame - processed_frame = VideoFrame.from_av_frame_with_timing(processed_av_frame, frame) - return processed_frame - + + return VideoFrame.from_av_frame_with_timing(processed_av_frame, frame) + except Exception as e: logger.error(f"Video processing failed: {e}") return frame @@ -224,14 +405,19 @@ async def process_audio_async(self, frame: AudioFrame) -> List[AudioFrame]: try: if not self.pipeline: return [frame] - + # Audio always passes through during warmup + if self._is_warmup_active(): + return [frame] + # On first frame of an active stream, start/resume runner + await self._ensure_runner_active() + # Audio processing needed - use pipeline av_frame = frame.to_av_frame() await self.pipeline.put_audio_frame(av_frame) processed_av_frame = await self.pipeline.get_processed_audio_frame() processed_frame = AudioFrame.from_av_audio(processed_av_frame) return [processed_frame] - + except Exception as e: logger.error(f"Audio processing failed: {e}") return [frame] @@ -240,44 +426,62 @@ async def update_params(self, params: dict): """Update processing parameters.""" if not self.pipeline: return - + # Handle list input - take first element if isinstance(params, list) and params: params = params[0] - + # Validate parameters using the centralized validation validated = ComfyStreamParamsUpdateRequest(**params).model_dump() logger.info(f"Parameter validation successful, keys: {list(validated.keys())}") - - # Process prompts if provided - if "prompts" in validated and validated["prompts"]: - await self._process_prompts(validated["prompts"]) - + # Update pipeline dimensions if "width" in validated: self.pipeline.width = int(validated["width"]) if "height" in validated: self.pipeline.height = int(validated["height"]) + + # Handle warmup - if prompts are provided, pass them to warmup + # If warmup is explicitly requested OR prompts are changing, trigger warmup + should_warmup = validated.get("warmup", False) or ("prompts" in validated and validated["prompts"]) - # Schedule warmup if requested - if validated.get("warmup", False): - self._schedule_warmup() - + if should_warmup: + if not self._is_warmup_active(): + # Clear pipeline queues before warmup to avoid processing stale frames + logger.info("Clearing pipeline queues before warmup") + await self.pipeline._clear_pipeline_queues() + + # Pass prompts to warmup if they exist + warmup_kwargs = {} + if "prompts" in validated and validated["prompts"]: + warmup_kwargs["prompts"] = validated["prompts"] + + self._start_warmup_sequence(self.warmup(**warmup_kwargs)) + else: + logger.info("Warmup already active, ignoring warmup request") + - async def _process_prompts(self, prompts): + async def _process_prompts(self, prompts, *, skip_warmup: bool = False): """Process and set prompts in the pipeline.""" try: converted = convert_prompt(prompts, return_dict=True) - + # Set prompts in pipeline await self.pipeline.set_prompts([converted]) logger.info(f"Prompts set successfully: {list(prompts.keys())}") - + + # Trigger loading overlay and warmup sequence for new prompts unless suppressed + if not skip_warmup: + try: + self._start_warmup_sequence(self.warmup()) + except Exception: + logger.debug("Failed to start warmup sequence after prompt update", exc_info=True) + # Update text monitoring based on workflow capabilities if self.pipeline.produces_text_output(): self._setup_text_monitoring() else: await self._stop_text_forwarder() - + except Exception as e: logger.error(f"Failed to process prompts: {e}") diff --git a/src/comfystream/client.py b/src/comfystream/client.py index 7686fca1..d6a6ddbb 100644 --- a/src/comfystream/client.py +++ b/src/comfystream/client.py @@ -1,9 +1,10 @@ import asyncio -from typing import List import logging +from typing import List +import contextlib from comfystream import tensor_cache -from comfystream.utils import convert_prompt +from comfystream.utils import convert_prompt, get_default_workflow from comfystream.exceptions import ComfyStreamInputTimeoutError from comfy.api.components.schema.prompt import PromptDictInput @@ -17,12 +18,17 @@ class ComfyStreamClient: def __init__(self, max_workers: int = 1, **kwargs): config = Configuration(**kwargs) self.comfy_client = EmbeddedComfyClient(config, max_workers=max_workers) - self.running_prompts = {} # To be used for cancelling tasks + self.running_prompts = {} self.current_prompts = [] self._cleanup_lock = asyncio.Lock() self._prompt_update_lock = asyncio.Lock() self._stop_event = asyncio.Event() + # PromptRunner state + self._shutdown_event = asyncio.Event() + self._run_enabled_event = asyncio.Event() + self._runner_task = None + async def set_prompts(self, prompts: List[PromptDictInput]): """Set new prompts, replacing any existing ones. @@ -36,15 +42,15 @@ async def set_prompts(self, prompts: List[PromptDictInput]): if not prompts: raise ValueError("Cannot set empty prompts list") - # Cancel existing prompts first to avoid conflicts - await self.cancel_running_prompts() - # Reset stop event for new prompts - self._stop_event.clear() + # Pause runner while swapping prompts to avoid interleaving + was_running = self._run_enabled_event.is_set() + self._run_enabled_event.clear() self.current_prompts = [convert_prompt(prompt) for prompt in prompts] - logger.info(f"Queuing {len(self.current_prompts)} prompt(s) for execution") - for idx in range(len(self.current_prompts)): - task = asyncio.create_task(self.run_prompt(idx)) - self.running_prompts[idx] = task + logger.info(f"Configured {len(self.current_prompts)} prompt(s)") + # Ensure runner exists (IDLE until resumed) + await self.ensure_prompt_tasks_running() + if was_running: + self._run_enabled_event.set() async def update_prompts(self, prompts: List[PromptDictInput]): async with self._prompt_update_lock: @@ -57,34 +63,64 @@ async def update_prompts(self, prompts: List[PromptDictInput]): for idx, prompt in enumerate(prompts): converted_prompt = convert_prompt(prompt) try: + # Lightweight validation by queueing is retained for compatibility await self.comfy_client.queue_prompt(converted_prompt) self.current_prompts[idx] = converted_prompt except Exception as e: raise Exception(f"Prompt update failed: {str(e)}") from e - async def run_prompt(self, prompt_index: int): - while not self._stop_event.is_set(): - async with self._prompt_update_lock: - try: - await self.comfy_client.queue_prompt(self.current_prompts[prompt_index]) - except asyncio.CancelledError: - raise - except ComfyStreamInputTimeoutError: - # Timeout errors are expected during stream switching - just continue - logger.info(f"Input for prompt {prompt_index} timed out, continuing") - continue - except Exception as e: - await self.cleanup() - logger.error(f"Error running prompt: {str(e)}") - raise + async def ensure_prompt_tasks_running(self): + # Ensure the single runner task exists (does not force running) + if self._runner_task and not self._runner_task.done(): + return + if not self.current_prompts: + return + self._shutdown_event.clear() + self._runner_task = asyncio.create_task(self._runner_loop()) + + async def _runner_loop(self): + try: + while not self._shutdown_event.is_set(): + # IDLE until running is enabled + await self._run_enabled_event.wait() + # Snapshot prompts without holding the lock during network I/O + async with self._prompt_update_lock: + prompts_snapshot = list(self.current_prompts) + for prompt_index, prompt in enumerate(prompts_snapshot): + if self._shutdown_event.is_set() or not self._run_enabled_event.is_set(): + break + try: + await self.comfy_client.queue_prompt(prompt) + except asyncio.CancelledError: + raise + except ComfyStreamInputTimeoutError: + logger.info(f"Input for prompt {prompt_index} timed out, continuing") + continue + except Exception as e: + logger.error(f"Error running prompt: {str(e)}") + logger.info("Stopping prompt execution and returning to passthrough mode") + + # Stop running and switch to default passthrough workflow + await self._fallback_to_passthrough() + break + except asyncio.CancelledError: + pass async def cleanup(self): - # Set stop event to signal prompt loops to exit + # Signal runner to shutdown self._stop_event.set() - - await self.cancel_running_prompts() + self._shutdown_event.set() + if self._runner_task: + self._runner_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await self._runner_task + self._runner_task = None + + # Pause running + self._run_enabled_event.clear() + async with self._cleanup_lock: - if self.comfy_client.is_running: + if getattr(self.comfy_client, "is_running", False): try: await self.comfy_client.__aexit__() except Exception as e: @@ -94,15 +130,8 @@ async def cleanup(self): logger.info("Client cleanup complete") async def cancel_running_prompts(self): - async with self._cleanup_lock: - tasks_to_cancel = list(self.running_prompts.values()) - for task in tasks_to_cancel: - task.cancel() - try: - await task - except asyncio.CancelledError: - pass - self.running_prompts.clear() + """Compatibility: pause the runner without destroying it.""" + self._run_enabled_event.clear() async def cleanup_queues(self): @@ -121,6 +150,43 @@ async def cleanup_queues(self): while not tensor_cache.text_outputs.empty(): await tensor_cache.text_outputs.get() + # Explicit lifecycle helpers for external controllers (FrameProcessor) + def resume(self): + self._run_enabled_event.set() + + def pause(self): + self._run_enabled_event.clear() + + async def stop_prompts_immediately(self): + """Cancel the runner task to immediately stop any in-flight prompt execution.""" + self._run_enabled_event.clear() + if self._runner_task: + self._runner_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await self._runner_task + self._runner_task = None + + async def _fallback_to_passthrough(self): + """Switch to default passthrough workflow when an error occurs.""" + try: + # Pause the runner + self._run_enabled_event.clear() + + # Set to default passthrough workflow + default_workflow = get_default_workflow() + async with self._prompt_update_lock: + self.current_prompts = [convert_prompt(default_workflow)] + + logger.info("Switched to default passthrough workflow") + + # Resume the runner with passthrough workflow + self._run_enabled_event.set() + + except Exception as e: + logger.error(f"Failed to fallback to passthrough: {str(e)}") + # If fallback fails, just pause execution + self._run_enabled_event.clear() + def put_video_input(self, frame): if tensor_cache.image_inputs.full(): tensor_cache.image_inputs.get(block=True)