diff --git a/apps/web-evals/src/actions/queue.ts b/apps/web-evals/src/actions/queue.ts new file mode 100644 index 00000000000..c6723f76d78 --- /dev/null +++ b/apps/web-evals/src/actions/queue.ts @@ -0,0 +1,113 @@ +"use server" + +import fs from "fs" +import { spawn } from "child_process" +import { revalidatePath } from "next/cache" + +import { deleteRun as _deleteRun } from "@roo-code/evals" + +import { redisClient } from "@/lib/server/redis" + +const RUN_QUEUE_KEY = "evals:run-queue" +const ACTIVE_RUN_KEY = "evals:active-run" +const DISPATCH_LOCK_KEY = "evals:dispatcher:lock" +const ACTIVE_RUN_TTL_SECONDS = 60 * 60 * 12 // 12 hours +const DISPATCH_LOCK_TTL_SECONDS = 30 + +async function spawnController(runId: number) { + const isRunningInDocker = fs.existsSync("/.dockerenv") + + const dockerArgs = [ + `--name evals-controller-${runId}`, + "--rm", + "--network evals_default", + "-v /var/run/docker.sock:/var/run/docker.sock", + "-v /tmp/evals:/var/log/evals", + "-e HOST_EXECUTION_METHOD=docker", + ] + + const cliCommand = `pnpm --filter @roo-code/evals cli --runId ${runId}` + + const command = isRunningInDocker + ? `docker run ${dockerArgs.join(" ")} evals-runner sh -c "${cliCommand}"` + : cliCommand + + const childProcess = spawn("sh", ["-c", command], { + detached: true, + stdio: ["ignore", "pipe", "pipe"], + }) + + // Best-effort logging of controller output + try { + const logStream = fs.createWriteStream("/tmp/roo-code-evals.log", { flags: "a" }) + childProcess.stdout?.pipe(logStream) + childProcess.stderr?.pipe(logStream) + } catch (_error) { + // Intentionally ignore logging pipe errors + } + + childProcess.unref() +} + +/** + * Enqueue a run into the global FIFO (idempotent). + */ +export async function enqueueRun(runId: number) { + const redis = await redisClient() + const exists = await redis.lPos(RUN_QUEUE_KEY, runId.toString()) + if (exists === null) { + await redis.rPush(RUN_QUEUE_KEY, runId.toString()) + } + revalidatePath("/runs") +} + +/** + * Dispatcher: if no active run, pop next from queue and start controller. + * Uses a short-lived lock to avoid races between concurrent dispatchers. + */ +export async function dispatchNextRun() { + const redis = await redisClient() + + // Try to acquire dispatcher lock + const locked = await redis.set(DISPATCH_LOCK_KEY, "1", { NX: true, EX: DISPATCH_LOCK_TTL_SECONDS }) + if (!locked) return + + try { + // If an active run is present, nothing to do. + const active = await redis.get(ACTIVE_RUN_KEY) + if (active) return + + const nextId = await redis.lPop(RUN_QUEUE_KEY) + if (!nextId) return + + const ok = await redis.set(ACTIVE_RUN_KEY, nextId, { NX: true, EX: ACTIVE_RUN_TTL_SECONDS }) + if (!ok) { + // put it back to preserve order and exit + await redis.lPush(RUN_QUEUE_KEY, nextId) + return + } + + await spawnController(Number(nextId)) + } finally { + await redis.del(DISPATCH_LOCK_KEY).catch(() => {}) + } +} + +/** + * Return 1-based position in the global FIFO queue, or null if not queued. + */ +export async function getQueuePosition(runId: number): Promise { + const redis = await redisClient() + const idx = await redis.lPos(RUN_QUEUE_KEY, runId.toString()) + return idx === null ? null : idx + 1 +} + +/** + * Remove a queued run from the FIFO queue and delete the run record. + */ +export async function cancelQueuedRun(runId: number) { + const redis = await redisClient() + await redis.lRem(RUN_QUEUE_KEY, 1, runId.toString()) + await _deleteRun(runId) + revalidatePath("/runs") +} diff --git a/apps/web-evals/src/actions/runs.ts b/apps/web-evals/src/actions/runs.ts index 2eae1f6804a..b10747f889f 100644 --- a/apps/web-evals/src/actions/runs.ts +++ b/apps/web-evals/src/actions/runs.ts @@ -1,9 +1,9 @@ "use server" import * as path from "path" -import fs from "fs" import { fileURLToPath } from "url" -import { spawn } from "child_process" + +import { enqueueRun, dispatchNextRun } from "@/actions/queue" import { revalidatePath } from "next/cache" import pMap from "p-map" @@ -52,41 +52,9 @@ export async function createRun({ suite, exercises = [], systemPrompt, timeout, revalidatePath("/runs") try { - const isRunningInDocker = fs.existsSync("/.dockerenv") - - const dockerArgs = [ - `--name evals-controller-${run.id}`, - "--rm", - "--network evals_default", - "-v /var/run/docker.sock:/var/run/docker.sock", - "-v /tmp/evals:/var/log/evals", - "-e HOST_EXECUTION_METHOD=docker", - ] - - const cliCommand = `pnpm --filter @roo-code/evals cli --runId ${run.id}` - - const command = isRunningInDocker - ? `docker run ${dockerArgs.join(" ")} evals-runner sh -c "${cliCommand}"` - : cliCommand - - console.log("spawn ->", command) - - const childProcess = spawn("sh", ["-c", command], { - detached: true, - stdio: ["ignore", "pipe", "pipe"], - }) - - const logStream = fs.createWriteStream("/tmp/roo-code-evals.log", { flags: "a" }) - - if (childProcess.stdout) { - childProcess.stdout.pipe(logStream) - } - - if (childProcess.stderr) { - childProcess.stderr.pipe(logStream) - } - - childProcess.unref() + // Enqueue the run and attempt to dispatch if no active run exists. + await enqueueRun(run.id) + await dispatchNextRun() } catch (error) { console.error(error) } diff --git a/apps/web-evals/src/components/home/run.tsx b/apps/web-evals/src/components/home/run.tsx index c35673885c3..2f4efeb5410 100644 --- a/apps/web-evals/src/components/home/run.tsx +++ b/apps/web-evals/src/components/home/run.tsx @@ -1,10 +1,13 @@ import { useCallback, useState, useRef } from "react" import Link from "next/link" -import { Ellipsis, ClipboardList, Copy, Check, LoaderCircle, Trash } from "lucide-react" +import { useQuery } from "@tanstack/react-query" +import { Ellipsis, ClipboardList, Copy, Check, LoaderCircle, Trash, XCircle } from "lucide-react" import type { Run as EvalsRun, TaskMetrics as EvalsTaskMetrics } from "@roo-code/evals" import { deleteRun } from "@/actions/runs" +import { getHeartbeat } from "@/actions/heartbeat" +import { getQueuePosition, cancelQueuedRun } from "@/actions/queue" import { formatCurrency, formatDuration, formatTokens, formatToolUsageSuccessRate } from "@/lib/formatters" import { useCopyRun } from "@/hooks/use-copy-run" import { @@ -35,6 +38,23 @@ export function Run({ run, taskMetrics }: RunProps) { const continueRef = useRef(null) const { isPending, copyRun, copied } = useCopyRun(run.id) + // Poll heartbeat and queue position for status column + const { data: heartbeat } = useQuery({ + queryKey: ["getHeartbeat", run.id], + queryFn: () => getHeartbeat(run.id), + refetchInterval: 10_000, + }) + + const { data: queuePosition } = useQuery({ + queryKey: ["getQueuePosition", run.id], + queryFn: () => getQueuePosition(run.id), + refetchInterval: 10_000, + }) + + const isCompleted = !!run.taskMetricsId + const isRunning = !!heartbeat + const isQueued = !isCompleted && !isRunning && queuePosition !== null && queuePosition !== undefined + const onConfirmDelete = useCallback(async () => { if (!deleteRunId) { return @@ -51,6 +71,9 @@ export function Run({ run, taskMetrics }: RunProps) { return ( <> + + {isCompleted ? "Completed" : isRunning ? "Running" : isQueued ? <>Queued (#{queuePosition}) : ""} + {run.model} {run.passed} {run.failed} @@ -116,6 +139,21 @@ export function Run({ run, taskMetrics }: RunProps) { )} + {isQueued && ( + { + try { + await cancelQueuedRun(run.id) + } catch (error) { + console.error(error) + } + }}> +
+ +
Cancel
+
+
+ )} { setDeleteRunId(run.id) diff --git a/apps/web-evals/src/components/home/runs.tsx b/apps/web-evals/src/components/home/runs.tsx index 8bc8739b28e..547c3ac4486 100644 --- a/apps/web-evals/src/components/home/runs.tsx +++ b/apps/web-evals/src/components/home/runs.tsx @@ -18,6 +18,7 @@ export function Runs({ runs }: { runs: RunWithTaskMetrics[] }) { + Status Model Passed Failed @@ -34,7 +35,7 @@ export function Runs({ runs }: { runs: RunWithTaskMetrics[] }) { runs.map(({ taskMetrics, ...run }) => ) ) : ( - + No eval runs yet.