diff --git a/apps/web-evals/src/actions/runs.ts b/apps/web-evals/src/actions/runs.ts index 2eae1f6804a..3468077b812 100644 --- a/apps/web-evals/src/actions/runs.ts +++ b/apps/web-evals/src/actions/runs.ts @@ -15,12 +15,118 @@ import { deleteRun as _deleteRun, createTask, getExercisesForLanguage, + findRun, } from "@roo-code/evals" import { CreateRun } from "@/lib/schemas" +import { redisClient } from "@/lib/server/redis" const EVALS_REPO_PATH = path.resolve(path.dirname(fileURLToPath(import.meta.url)), "../../../../../evals") +// Queue management keys (matching the ones in packages/evals/src/cli/redis.ts) +const getRunQueueKey = () => `evals:run-queue` +const getActiveRunKey = () => `evals:active-run` +const getDispatcherLockKey = () => `evals:dispatcher:lock` + +async function spawnController(runId: number) { + const isRunningInDocker = fs.existsSync("/.dockerenv") + + const dockerArgs = [ + `--name evals-controller-${runId}`, + "--rm", + "--network evals_default", + "-v /var/run/docker.sock:/var/run/docker.sock", + "-v /tmp/evals:/var/log/evals", + "-e HOST_EXECUTION_METHOD=docker", + ] + + const cliCommand = `pnpm --filter @roo-code/evals cli --runId ${runId}` + + const command = isRunningInDocker + ? `docker run ${dockerArgs.join(" ")} evals-runner sh -c "${cliCommand}"` + : cliCommand + + console.log("spawn ->", command) + + const childProcess = spawn("sh", ["-c", command], { + detached: true, + stdio: ["ignore", "pipe", "pipe"], + }) + + const logStream = fs.createWriteStream("/tmp/roo-code-evals.log", { flags: "a" }) + + if (childProcess.stdout) { + childProcess.stdout.pipe(logStream) + } + + if (childProcess.stderr) { + childProcess.stderr.pipe(logStream) + } + + childProcess.unref() +} + +export async function dispatchNextRun() { + const redis = await redisClient() + + // Try to acquire dispatcher lock (10 second TTL) + const lockAcquired = await redis.set(getDispatcherLockKey(), Date.now().toString(), { + NX: true, + EX: 10, + }) + + if (lockAcquired !== "OK") { + console.log("Dispatcher lock already held, skipping dispatch") + return + } + + try { + // Check if there's already an active run + const activeRunId = await redis.get(getActiveRunKey()) + if (activeRunId) { + console.log(`Run ${activeRunId} is already active, skipping dispatch`) + return + } + + // Pop the next run from the queue + const nextRunId = await redis.lPop(getRunQueueKey()) + if (!nextRunId) { + console.log("No runs in queue") + return + } + + const runId = parseInt(nextRunId, 10) + console.log(`Dispatching run ${runId}`) + + // Set as active run with generous TTL (1 hour default, will be cleared when run completes) + const setActive = await redis.set(getActiveRunKey(), runId.toString(), { + NX: true, + EX: 3600, + }) + + if (setActive !== "OK") { + // Another process may have set an active run, put this run back in the queue + console.log("Failed to set active run, requeueing") + await redis.lPush(getRunQueueKey(), runId.toString()) + return + } + + // Spawn the controller for this run + try { + await spawnController(runId) + console.log(`Successfully spawned controller for run ${runId}`) + } catch (error) { + console.error(`Failed to spawn controller for run ${runId}:`, error) + // Clear active run and requeue on spawn failure + await redis.del(getActiveRunKey()) + await redis.lPush(getRunQueueKey(), runId.toString()) + } + } finally { + // Release dispatcher lock + await redis.del(getDispatcherLockKey()) + } +} + // eslint-disable-next-line @typescript-eslint/no-unused-vars export async function createRun({ suite, exercises = [], systemPrompt, timeout, ...values }: CreateRun) { const run = await _createRun({ @@ -51,50 +157,70 @@ export async function createRun({ suite, exercises = [], systemPrompt, timeout, revalidatePath("/runs") + // Add run to queue + const redis = await redisClient() + await redis.rPush(getRunQueueKey(), run.id.toString()) + console.log(`Run ${run.id} added to queue`) + + // Try to dispatch if no active run try { - const isRunningInDocker = fs.existsSync("/.dockerenv") + await dispatchNextRun() + } catch (error) { + console.error("Error dispatching run:", error) + } - const dockerArgs = [ - `--name evals-controller-${run.id}`, - "--rm", - "--network evals_default", - "-v /var/run/docker.sock:/var/run/docker.sock", - "-v /tmp/evals:/var/log/evals", - "-e HOST_EXECUTION_METHOD=docker", - ] + return run +} - const cliCommand = `pnpm --filter @roo-code/evals cli --runId ${run.id}` +export async function deleteRun(runId: number) { + await _deleteRun(runId) + revalidatePath("/runs") +} - const command = isRunningInDocker - ? `docker run ${dockerArgs.join(" ")} evals-runner sh -c "${cliCommand}"` - : cliCommand +export async function cancelQueuedRun(runId: number) { + const redis = await redisClient() - console.log("spawn ->", command) + // Remove from queue + const removed = await redis.lRem(getRunQueueKey(), 1, runId.toString()) - const childProcess = spawn("sh", ["-c", command], { - detached: true, - stdio: ["ignore", "pipe", "pipe"], - }) + if (removed > 0) { + console.log(`Removed run ${runId} from queue`) + // Delete the run from database + await deleteRun(runId) + return true + } - const logStream = fs.createWriteStream("/tmp/roo-code-evals.log", { flags: "a" }) + return false +} - if (childProcess.stdout) { - childProcess.stdout.pipe(logStream) - } +export async function getRunQueueStatus(runId: number) { + const redis = await redisClient() - if (childProcess.stderr) { - childProcess.stderr.pipe(logStream) - } + // Check if run is active + const activeRunId = await redis.get(getActiveRunKey()) + if (activeRunId === runId.toString()) { + return { status: "running" as const, position: null } + } - childProcess.unref() - } catch (error) { - console.error(error) + // Check position in queue + const queue = await redis.lRange(getRunQueueKey(), 0, -1) + const position = queue.indexOf(runId.toString()) + + if (position !== -1) { + return { status: "queued" as const, position: position + 1 } } - return run -} + // Check if run has a heartbeat (running but not marked as active - edge case) + const heartbeat = await redis.get(`heartbeat:${runId}`) + if (heartbeat) { + return { status: "running" as const, position: null } + } -export async function deleteRun(runId: number) { - await _deleteRun(runId) - revalidatePath("/runs") + // Run is completed or not found + const run = await findRun(runId) + if (run?.taskMetricsId) { + return { status: "completed" as const, position: null } + } + + return { status: "unknown" as const, position: null } } diff --git a/apps/web-evals/src/components/home/run.tsx b/apps/web-evals/src/components/home/run.tsx index c35673885c3..e16c734af5c 100644 --- a/apps/web-evals/src/components/home/run.tsx +++ b/apps/web-evals/src/components/home/run.tsx @@ -1,10 +1,11 @@ -import { useCallback, useState, useRef } from "react" +import { useCallback, useState, useRef, useEffect } from "react" import Link from "next/link" -import { Ellipsis, ClipboardList, Copy, Check, LoaderCircle, Trash } from "lucide-react" +import { Ellipsis, ClipboardList, Copy, Check, LoaderCircle, Trash, X, Clock, Play, CheckCircle } from "lucide-react" import type { Run as EvalsRun, TaskMetrics as EvalsTaskMetrics } from "@roo-code/evals" -import { deleteRun } from "@/actions/runs" +import { deleteRun, cancelQueuedRun, getRunQueueStatus } from "@/actions/runs" +import { getHeartbeat } from "@/actions/heartbeat" import { formatCurrency, formatDuration, formatTokens, formatToolUsageSuccessRate } from "@/lib/formatters" import { useCopyRun } from "@/hooks/use-copy-run" import { @@ -23,6 +24,7 @@ import { AlertDialogFooter, AlertDialogHeader, AlertDialogTitle, + Badge, } from "@/components/ui" type RunProps = { @@ -30,11 +32,57 @@ type RunProps = { taskMetrics: EvalsTaskMetrics | null } +type RunStatus = { + status: "running" | "queued" | "completed" | "unknown" + position: number | null +} + export function Run({ run, taskMetrics }: RunProps) { const [deleteRunId, setDeleteRunId] = useState() + const [runStatus, setRunStatus] = useState({ status: "unknown", position: null }) + const [isLoadingStatus, setIsLoadingStatus] = useState(true) + const [isCancelling, setIsCancelling] = useState(false) const continueRef = useRef(null) const { isPending, copyRun, copied } = useCopyRun(run.id) + // Fetch run status on mount and periodically + useEffect(() => { + const fetchStatus = async () => { + try { + // First check if run is completed + if (run.taskMetricsId) { + setRunStatus({ status: "completed", position: null }) + setIsLoadingStatus(false) + return + } + + // Check heartbeat for running status + const heartbeat = await getHeartbeat(run.id) + if (heartbeat) { + setRunStatus({ status: "running", position: null }) + setIsLoadingStatus(false) + return + } + + // Get queue status + const status = await getRunQueueStatus(run.id) + setRunStatus(status) + } catch (error) { + console.error("Error fetching run status:", error) + } finally { + setIsLoadingStatus(false) + } + } + + fetchStatus() + // Refresh status every 5 seconds for non-completed runs + const interval = !run.taskMetricsId ? setInterval(fetchStatus, 5000) : null + + return () => { + if (interval) clearInterval(interval) + } + }, [run.id, run.taskMetricsId]) + const onConfirmDelete = useCallback(async () => { if (!deleteRunId) { return @@ -48,9 +96,57 @@ export function Run({ run, taskMetrics }: RunProps) { } }, [deleteRunId]) + const handleCancelQueued = useCallback(async () => { + setIsCancelling(true) + try { + const cancelled = await cancelQueuedRun(run.id) + if (cancelled) { + // Refresh the page to update the list + window.location.reload() + } + } catch (error) { + console.error("Error cancelling queued run:", error) + } finally { + setIsCancelling(false) + } + }, [run.id]) + + const getStatusBadge = () => { + if (isLoadingStatus) { + return Loading... + } + + switch (runStatus.status) { + case "running": + return ( + + + Running + + ) + case "queued": + return ( + + + Queued #{runStatus.position} + + ) + case "completed": + return ( + + + Completed + + ) + default: + return Unknown + } + } + return ( <> + {getStatusBadge()} {run.model} {run.passed} {run.failed} @@ -79,55 +175,71 @@ export function Run({ run, taskMetrics }: RunProps) { {taskMetrics && formatCurrency(taskMetrics.cost)} {taskMetrics && formatDuration(taskMetrics.duration)} - - - - - -
- -
View Tasks
-
- -
- {run.taskMetricsId && ( - copyRun()} disabled={isPending || copied}> +
+ {runStatus.status === "queued" && ( + + )} + + + + + +
+ +
View Tasks
+
+ +
+ {run.taskMetricsId && ( + copyRun()} disabled={isPending || copied}> +
+ {isPending ? ( + <> + + Copying... + + ) : copied ? ( + <> + + Copied! + + ) : ( + <> + + Copy to Production + + )} +
+
+ )} + { + setDeleteRunId(run.id) + setTimeout(() => continueRef.current?.focus(), 0) + }}>
- {isPending ? ( - <> - - Copying... - - ) : copied ? ( - <> - - Copied! - - ) : ( - <> - - Copy to Production - - )} + +
Delete
- )} - { - setDeleteRunId(run.id) - setTimeout(() => continueRef.current?.focus(), 0) - }}> -
- -
Delete
-
-
-
-
+ + +
setDeleteRunId(undefined)}> diff --git a/apps/web-evals/src/components/home/runs.tsx b/apps/web-evals/src/components/home/runs.tsx index 8bc8739b28e..547c3ac4486 100644 --- a/apps/web-evals/src/components/home/runs.tsx +++ b/apps/web-evals/src/components/home/runs.tsx @@ -18,6 +18,7 @@ export function Runs({ runs }: { runs: RunWithTaskMetrics[] }) { + Status Model Passed Failed @@ -34,7 +35,7 @@ export function Runs({ runs }: { runs: RunWithTaskMetrics[] }) { runs.map(({ taskMetrics, ...run }) => ) ) : ( - + No eval runs yet.