Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
e13aa81
feat(prompts): implement pause and resume functionality for prompt ex…
eliteprox Nov 7, 2025
6b0c46e
Merge branch 'main' into feat/prompt-exec-ctrl
eliteprox Nov 11, 2025
754929f
app.py(refactor): organize control message handling logic to use dedi…
eliteprox Nov 11, 2025
8715fb6
byoc: remove pause/resume handlers
eliteprox Nov 11, 2025
c9abfee
feat(error-handling): enhance error response handling in control mess…
eliteprox Nov 11, 2025
452ab1a
fix(prompts): enhance prompt management and error handling
eliteprox Nov 11, 2025
0a9ca64
fix(pipeline): update prompt handling and enhance loading overlay man…
eliteprox Nov 12, 2025
ce06c48
feat(app): add resolution handling in offer function
eliteprox Nov 12, 2025
ea98c7f
refactor(client): update cancel_running_prompts method to stop prompt…
eliteprox Nov 12, 2025
9733f36
chore: update dependencies and enhance loading overlay management
eliteprox Nov 12, 2025
8d7f841
revert devcontainer changes
eliteprox Nov 12, 2025
31249fe
refactor(client): remove unused stop_event and simplify prompt contro…
eliteprox Nov 13, 2025
44d9b96
bump pytrickle
eliteprox Nov 13, 2025
86c3889
update pytrickle dependency to latest commit and remove unused warmup…
eliteprox Nov 13, 2025
66ba1a7
byoc(stream_processor): add LoadingConfig with overlay mode enabled
eliteprox Nov 18, 2025
d1e73ef
bump pytrickle for cv2 check
eliteprox Nov 18, 2025
dd315ef
update pytrickle dependency and enhance stream start handling
eliteprox Nov 23, 2025
a2a4d64
refactor(byoc): add `frame_count_to_disable` parameter in the loading…
eliteprox Nov 25, 2025
7a89091
bump pytrickle
eliteprox Nov 25, 2025
3b6708f
chore(deps): pin pytrickle v0.1.5 in pyproject.toml and requirements.txt
eliteprox Nov 25, 2025
77161f4
revert changes to devcontainer
eliteprox Nov 25, 2025
c68d7dd
fix requirements.txt
eliteprox Nov 25, 2025
805b586
refactor(byoc, frame_processor): standardize timeout handling for ove…
eliteprox Nov 25, 2025
4b8c7ca
fix(server.app): remove redundant pipeline cleanup calls from individ…
eliteprox Nov 25, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ version = "0.1.6"
license = { file = "LICENSE" }
dependencies = [
"asyncio",
"pytrickle @ git+https://github.com/livepeer/pytrickle.git@v0.1.4",
"pytrickle @ git+https://github.com/livepeer/pytrickle.git@v0.1.5",
"comfyui @ git+https://github.com/hiddenswitch/ComfyUI.git@e62df3a8811d8c652a195d4669f4fb27f6c9a9ba",
"aiortc",
"aiohttp",
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
asyncio
pytrickle @ git+https://github.com/livepeer/pytrickle.git@v0.1.4
pytrickle @ git+https://github.com/livepeer/pytrickle.git@v0.1.5
comfyui @ git+https://github.com/hiddenswitch/ComfyUI.git@e62df3a8811d8c652a195d4669f4fb27f6c9a9ba
aiortc
aiohttp
Expand Down
225 changes: 158 additions & 67 deletions server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,6 @@ async def collect_frames(self):
logger.info("Frame collection task cancelled")
except Exception as e:
logger.error(f"Unexpected error in frame collection: {str(e)}")
finally:
await self.pipeline.cleanup()

async def recv(self):
"""Receive a processed video frame from the pipeline, increment the frame
Expand Down Expand Up @@ -195,8 +193,6 @@ async def collect_frames(self):
logger.info("Frame collection task cancelled")
except Exception as e:
logger.error(f"Unexpected error in audio frame collection: {str(e)}")
finally:
await self.pipeline.cleanup()

async def recv(self):
return await self.pipeline.get_processed_audio_frame()
Expand Down Expand Up @@ -252,13 +248,6 @@ async def offer(request):
prompts = params.get("prompts")
is_noop_mode = not prompts

if is_noop_mode:
logger.info("[Offer] No prompts provided - entering noop passthrough mode")
else:
await pipeline.set_prompts(prompts)
logger.info("[Offer] Set workflow prompts")

# Set resolution if provided in the offer
resolution = params.get("resolution")
if resolution:
pipeline.width = resolution["width"]
Expand All @@ -267,6 +256,16 @@ async def offer(request):
f"[Offer] Set pipeline resolution to {resolution['width']}x{resolution['height']}"
)

if is_noop_mode:
logger.info("[Offer] No prompts provided - entering noop passthrough mode")
else:
await pipeline.apply_prompts(
prompts,
skip_warmup=False,
)
await pipeline.start_streaming()
logger.info("[Offer] Set workflow prompts, warmed pipeline, and started execution")

offer_params = params["offer"]
offer = RTCSessionDescription(sdp=offer_params["sdp"], type=offer_params["type"])

Expand Down Expand Up @@ -312,57 +311,140 @@ def on_datachannel(channel):

@channel.on("message")
async def on_message(message):
try:
params = json.loads(message)
def send_json(payload):
channel.send(json.dumps(payload))

def send_success_response(response_type, **extra):
payload = {"type": response_type, "success": True}
payload.update(extra)
send_json(payload)

def send_error_response(response_type, error_message, **extra):
payload = {
"type": response_type,
"success": False,
"error": error_message,
}
payload.update(extra)
send_json(payload)

async def handle_get_nodes(_params):
nodes_info = await pipeline.get_nodes_info()
send_json({"type": "nodes_info", "nodes": nodes_info})

async def handle_update_prompts(_params):
if "prompts" not in _params:
logger.warning("[Control] Missing prompt in update_prompt message")
send_error_response(
"prompts_updated", "Missing 'prompts' in control message"
)
return
try:
await pipeline.update_prompts(_params["prompts"])
except Exception as e:
logger.error(f"Error updating prompt: {str(e)}")
send_error_response("prompts_updated", str(e))
return
send_success_response("prompts_updated")

async def handle_update_resolution(_params):
width = _params.get("width")
height = _params.get("height")
if width is None or height is None:
logger.warning(
"[Control] Missing width or height in update_resolution message"
)
send_error_response(
"resolution_updated",
"Missing 'width' or 'height' in control message",
)
return

if is_noop_mode:
logger.info(
f"[Control] Noop mode - resolution update to {width}x{height} (no pipeline involved)"
)
else:
# Update pipeline resolution for future frames
pipeline.width = width
pipeline.height = height
logger.info(f"[Control] Updated resolution to {width}x{height}")

# Mark that we've received resolution
resolution_received["value"] = True

if is_noop_mode:
logger.info("[Control] Noop mode - no warmup needed")
else:
# Note: Video warmup now happens during offer, not here
logger.info(
"[Control] Resolution updated - warmup was already performed during offer"
)

if params.get("type") == "get_nodes":
nodes_info = await pipeline.get_nodes_info()
response = {"type": "nodes_info", "nodes": nodes_info}
channel.send(json.dumps(response))
elif params.get("type") == "update_prompts":
if "prompts" not in params:
logger.warning("[Control] Missing prompt in update_prompt message")
send_success_response("resolution_updated")

async def handle_pause_prompts(_params):
if is_noop_mode:
logger.info("[Control] Noop mode - no prompts to pause")
else:
try:
await pipeline.pause_prompts()
logger.info("[Control] Paused prompt execution")
except Exception as e:
logger.error(f"[Control] Error pausing prompts: {str(e)}")
send_error_response("prompts_paused", str(e))
return
send_success_response("prompts_paused")

async def handle_resume_prompts(_params):
if is_noop_mode:
logger.info("[Control] Noop mode - no prompts to resume")
else:
try:
await pipeline.update_prompts(params["prompts"])
await pipeline.start_streaming()
logger.info("[Control] Resumed prompt execution")
except Exception as e:
logger.error(f"Error updating prompt: {str(e)}")
response = {"type": "prompts_updated", "success": True}
channel.send(json.dumps(response))
elif params.get("type") == "update_resolution":
if "width" not in params or "height" not in params:
logger.warning(
"[Control] Missing width or height in update_resolution message"
)
logger.error(f"[Control] Error resuming prompts: {str(e)}")
send_error_response("prompts_resumed", str(e))
return
send_success_response("prompts_resumed")

if is_noop_mode:
logger.info(
f"[Control] Noop mode - resolution update to {params['width']}x{params['height']} (no pipeline involved)"
)
else:
# Update pipeline resolution for future frames
pipeline.width = params["width"]
pipeline.height = params["height"]
logger.info(
f"[Control] Updated resolution to {params['width']}x{params['height']}"
)

# Mark that we've received resolution
resolution_received["value"] = True

if is_noop_mode:
logger.info("[Control] Noop mode - no warmup needed")
else:
# Note: Video warmup now happens during offer, not here
logger.info(
"[Control] Resolution updated - warmup was already performed during offer"
)

response = {"type": "resolution_updated", "success": True}
channel.send(json.dumps(response))
async def handle_stop_prompts(_params):
if is_noop_mode:
logger.info("[Control] Noop mode - no prompts to stop")
else:
logger.warning("[Server] Invalid message format - missing required fields")
try:
await pipeline.stop_prompts(cleanup=False)
logger.info("[Control] Stopped prompt execution")
except Exception as e:
logger.error(f"[Control] Error stopping prompts: {str(e)}")
send_error_response("prompts_stopped", str(e))
return
send_success_response("prompts_stopped")

handlers = {
"get_nodes": handle_get_nodes,
"update_prompts": handle_update_prompts,
"update_resolution": handle_update_resolution,
"pause_prompts": handle_pause_prompts,
"resume_prompts": handle_resume_prompts,
"stop_prompts": handle_stop_prompts,
}

try:
params = json.loads(message)
message_type = params.get("type")

if not message_type:
logger.warning("[Server] Control message missing 'type'")
return

handler = handlers.get(message_type)
if handler is None:
logger.warning(f"[Server] Unsupported control message: {message_type}")
return

await handler(params)
except json.JSONDecodeError:
logger.error("[Server] Invalid JSON received")
except Exception as e:
Expand Down Expand Up @@ -498,6 +580,13 @@ async def on_connectionstatechange():
if not task.done():
task.cancel()
request.app["data_channel_tasks"].clear()
# Cleanup pipeline once per connection (not per track)
if not is_noop_mode:
try:
await pipeline.stop_prompts(cleanup=True)
logger.info("Pipeline cleanup completed for failed connection")
except Exception as e:
logger.error(f"Error during pipeline cleanup on connection failure: {e}")
elif pc.connectionState == "closed":
await pc.close()
pcs.discard(pc)
Expand All @@ -507,6 +596,13 @@ async def on_connectionstatechange():
if not task.done():
task.cancel()
request.app["data_channel_tasks"].clear()
# Cleanup pipeline once per connection (not per track)
if not is_noop_mode:
try:
await pipeline.stop_prompts(cleanup=True)
logger.info("Pipeline cleanup completed for closed connection")
except Exception as e:
logger.error(f"Error during pipeline cleanup on connection close: {e}")

await pc.setRemoteDescription(offer)

Expand All @@ -519,15 +615,7 @@ async def on_connectionstatechange():
)

# Warm up the pipeline based on detected modalities and SDP content (skip in noop mode)
if not is_noop_mode:
if "m=video" in pc.remoteDescription.sdp and pipeline.accepts_video_input():
logger.info("[Offer] Warming up video pipeline")
await pipeline.warm_video()

if "m=audio" in pc.remoteDescription.sdp and pipeline.accepts_audio_input():
logger.info("[Offer] Warming up audio pipeline")
await pipeline.warm_audio()
else:
if is_noop_mode:
logger.debug("[Offer] Skipping pipeline warmup in noop mode")

answer = await pc.createAnswer()
Expand All @@ -541,7 +629,7 @@ async def on_connectionstatechange():

async def cancel_collect_frames(track):
track.running = False
if hasattr(track, "collect_task") is not None and not track.collect_task.done():
if track.collect_task and not track.collect_task.done():
try:
track.collect_task.cancel()
await track.collect_task
Expand All @@ -553,7 +641,7 @@ async def set_prompt(request):
pipeline = request.app["pipeline"]

prompt = await request.json()
await pipeline.set_prompts(prompt)
await pipeline.apply_prompts(prompt)

return web.Response(content_type="application/json", text="OK")

Expand All @@ -576,6 +664,7 @@ async def on_startup(app: web.Application):
comfyui_inference_log_level=app.get("comfyui_inference_log_level", None),
blacklist_custom_nodes=["ComfyUI-Manager"],
)
await app["pipeline"].initialize()
app["pcs"] = set()
app["video_tracks"] = {}

Expand Down Expand Up @@ -676,7 +765,9 @@ def force_print(*args, **kwargs):
logging.getLogger("comfy").setLevel(log_level)

# Add ComfyStream timeout filter to suppress verbose execution logging
logging.getLogger("comfy.cmd.execution").addFilter(ComfyStreamTimeoutFilter())
timeout_filter = ComfyStreamTimeoutFilter()
logging.getLogger("comfy.cmd.execution").addFilter(timeout_filter)
logging.getLogger("comfystream").addFilter(timeout_filter)
if args.comfyui_inference_log_level:
app["comfyui_inference_log_level"] = args.comfyui_inference_log_level

Expand Down
Loading