diff --git a/.agents/skills/code-review.md b/.agents/skills/code-review.md index 763989fa72..69930a7530 100644 --- a/.agents/skills/code-review.md +++ b/.agents/skills/code-review.md @@ -15,6 +15,8 @@ You have permission to **APPROVE** or **COMMENT** on PRs. Do not use REQUEST_CHA **Default to APPROVE**: If your review finds no issues at "important" level or higher, approve the PR. Minor suggestions or nitpicks alone are not sufficient reason to withhold approval. +**IMPORTANT: If you determine a PR is worth merging, you MUST approve it.** Do not just say a PR is "worth merging" or "ready to merge" without actually submitting an approval. Your words and actions must be consistent. + ### When to APPROVE Approve PRs that are straightforward and low-risk: diff --git a/AGENTS.md b/AGENTS.md index 0c51d8568f..378d9d59a8 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -280,6 +280,21 @@ git push -u origin ``` + +# Agent Temporary Directory Convention + +When tools need to store observation files (e.g., browser session recordings, task tracker data), use `.agent_tmp` as the directory name for consistency. + +The browser session recording tool saves recordings to `.agent_tmp/observations/recording-{timestamp}/`. + +This convention ensures tool-generated observation files are stored in a predictable location that can be easily: +- Added to `.gitignore` +- Cleaned up after agent sessions +- Identified as agent-generated artifacts + +Note: This is separate from `persistence_dir` which is used for conversation state persistence. + + - `openhands-sdk/` core SDK; `openhands-tools/` built-in tools; `openhands-workspace/` workspace management; `openhands-agent-server/` server runtime; `examples/` runnable patterns; `tests/` split by domain (`tests/sdk`, `tests/tools`, `tests/agent_server`, etc.). diff --git a/examples/01_standalone_sdk/38_browser_session_recording.py b/examples/01_standalone_sdk/38_browser_session_recording.py new file mode 100644 index 0000000000..16f35f7501 --- /dev/null +++ b/examples/01_standalone_sdk/38_browser_session_recording.py @@ -0,0 +1,178 @@ +"""Browser Session Recording Example + +This example demonstrates how to use the browser session recording feature +to capture and save a recording of the agent's browser interactions using rrweb. + +The recording can be replayed later using rrweb-player to visualize the agent's +browsing session. + +The recording will be automatically saved to the persistence directory when +browser_stop_recording is called. You can replay it with: + - rrweb-player: https://github.com/rrweb-io/rrweb/tree/master/packages/rrweb-player + - Online viewer: https://www.rrweb.io/demo/ +""" + +import json +import os + +from pydantic import SecretStr + +from openhands.sdk import ( + LLM, + Agent, + Conversation, + Event, + LLMConvertibleEvent, + get_logger, +) +from openhands.sdk.tool import Tool +from openhands.tools.browser_use import BrowserToolSet +from openhands.tools.browser_use.definition import BROWSER_RECORDING_OUTPUT_DIR + + +logger = get_logger(__name__) + +# Configure LLM +api_key = os.getenv("LLM_API_KEY") +assert api_key is not None, "LLM_API_KEY environment variable is not set." +model = os.getenv("LLM_MODEL", "anthropic/claude-sonnet-4-5-20250929") +base_url = os.getenv("LLM_BASE_URL") +llm = LLM( + usage_id="agent", + model=model, + base_url=base_url, + api_key=SecretStr(api_key), +) + +# Tools - including browser tools with recording capability +cwd = os.getcwd() +tools = [ + Tool(name=BrowserToolSet.name), +] + +# Agent +agent = Agent(llm=llm, tools=tools) + +llm_messages = [] # collect raw LLM messages + + +def conversation_callback(event: Event): + if isinstance(event, LLMConvertibleEvent): + llm_messages.append(event.to_llm_message()) + + +# Create conversation with persistence_dir set to save browser recordings +conversation = Conversation( + agent=agent, + callbacks=[conversation_callback], + workspace=cwd, + persistence_dir="./.conversations", +) + +# The prompt instructs the agent to: +# 1. Start recording the browser session +# 2. Browse to a website and perform some actions +# 3. Stop recording (auto-saves to file) +PROMPT = """ +Please complete the following task to demonstrate browser session recording: + +1. First, use `browser_start_recording` to begin recording the browser session. + +2. Then navigate to https://docs.openhands.dev/ and: + - Get the page content + - Scroll down the page + - Get the browser state to see interactive elements + +3. Next, navigate to https://docs.openhands.dev/openhands/usage/cli/installation and: + - Get the page content + - Scroll down to see more content + +4. Finally, use `browser_stop_recording` to stop the recording. + Events are automatically saved. +""" + +print("=" * 80) +print("Browser Session Recording Example") +print("=" * 80) +print("\nTask: Record an agent's browser session and save it for replay") +print("\nStarting conversation with agent...\n") + +conversation.send_message(PROMPT) +conversation.run() + +print("\n" + "=" * 80) +print("Conversation finished!") +print("=" * 80) + +# Check if the recording files were created +# Recordings are saved in BROWSER_RECORDING_OUTPUT_DIR/recording-{timestamp}/ +if os.path.exists(BROWSER_RECORDING_OUTPUT_DIR): + # Find recording subdirectories (they start with "recording-") + recording_dirs = sorted( + [ + d + for d in os.listdir(BROWSER_RECORDING_OUTPUT_DIR) + if d.startswith("recording-") + and os.path.isdir(os.path.join(BROWSER_RECORDING_OUTPUT_DIR, d)) + ] + ) + + if recording_dirs: + # Process the most recent recording directory + latest_recording = recording_dirs[-1] + recording_path = os.path.join(BROWSER_RECORDING_OUTPUT_DIR, latest_recording) + json_files = sorted( + [f for f in os.listdir(recording_path) if f.endswith(".json")] + ) + + print(f"\n✓ Recording saved to: {recording_path}") + print(f"✓ Number of files: {len(json_files)}") + + # Count total events across all files + total_events = 0 + all_event_types: dict[int | str, int] = {} + total_size = 0 + + for json_file in json_files: + filepath = os.path.join(recording_path, json_file) + file_size = os.path.getsize(filepath) + total_size += file_size + + with open(filepath) as f: + events = json.load(f) + + # Events are stored as a list in each file + if isinstance(events, list): + total_events += len(events) + for event in events: + event_type = event.get("type", "unknown") + all_event_types[event_type] = all_event_types.get(event_type, 0) + 1 + + print(f" - {json_file}: {len(events)} events, {file_size} bytes") + + print(f"✓ Total events: {total_events}") + print(f"✓ Total size: {total_size} bytes") + if all_event_types: + print(f"✓ Event types: {all_event_types}") + + print("\nTo replay this recording, you can use:") + print( + " - rrweb-player: " + "https://github.com/rrweb-io/rrweb/tree/master/packages/rrweb-player" + ) + else: + print(f"\n✗ No recording directories found in: {BROWSER_RECORDING_OUTPUT_DIR}") + print(" The agent may not have completed the recording task.") +else: + print(f"\n✗ Observations directory not found: {BROWSER_RECORDING_OUTPUT_DIR}") + print(" The agent may not have completed the recording task.") + +print("\n" + "=" * 100) +print("Conversation finished.") +print(f"Total LLM messages: {len(llm_messages)}") +print("=" * 100) + +# Report cost +cost = conversation.conversation_stats.get_combined_metrics().accumulated_cost +print(f"Conversation ID: {conversation.id}") +print(f"EXAMPLE_COST: {cost}") diff --git a/openhands-tools/openhands/tools/browser_use/definition.py b/openhands-tools/openhands/tools/browser_use/definition.py index 968f00c653..d20945dbb3 100644 --- a/openhands-tools/openhands/tools/browser_use/definition.py +++ b/openhands-tools/openhands/tools/browser_use/definition.py @@ -2,6 +2,7 @@ import base64 import hashlib +import os from collections.abc import Sequence from pathlib import Path from typing import TYPE_CHECKING, Literal, Self @@ -25,6 +26,9 @@ from openhands.tools.browser_use.impl import BrowserToolExecutor +# Directory where browser session recordings are saved +BROWSER_RECORDING_OUTPUT_DIR = os.path.join(".agent_tmp", "browser_observations") + # Mapping of base64 prefixes to MIME types for image detection BASE64_IMAGE_PREFIXES = { "/9j/": "image/jpeg", @@ -668,6 +672,103 @@ def create(cls, executor: "BrowserToolExecutor") -> Sequence[Self]: ] +# ============================================ +# `browser_start_recording` +# ============================================ +class BrowserStartRecordingAction(BrowserAction): + """Schema for starting browser session recording.""" + + pass + + +BROWSER_START_RECORDING_DESCRIPTION = f"""Start recording the browser session. + +This tool starts recording all browser interactions using rrweb. The recording +captures DOM mutations, mouse movements, clicks, scrolls, and other user interactions. + +Output Location: {BROWSER_RECORDING_OUTPUT_DIR}/recording-/ +Format: Recording events are saved as numbered JSON files (1.json, 2.json, etc.) +containing rrweb event arrays. Events are flushed every 5 seconds or when they +exceed 1 MB. These files can be replayed using rrweb-player. + +Call browser_stop_recording to stop recording and save any remaining events. + +Note: Recording persists across page navigations - the recording will automatically +restart on new pages. +""" + + +class BrowserStartRecordingTool( + ToolDefinition[BrowserStartRecordingAction, BrowserObservation] +): + """Tool for starting browser session recording.""" + + @classmethod + def create(cls, executor: "BrowserToolExecutor") -> Sequence[Self]: + return [ + cls( + description=BROWSER_START_RECORDING_DESCRIPTION, + action_type=BrowserStartRecordingAction, + observation_type=BrowserObservation, + annotations=ToolAnnotations( + title="browser_start_recording", + readOnlyHint=False, + destructiveHint=False, + idempotentHint=False, + openWorldHint=False, + ), + executor=executor, + ) + ] + + +# ============================================ +# `browser_stop_recording` +# ============================================ +class BrowserStopRecordingAction(BrowserAction): + """Schema for stopping browser session recording.""" + + pass + + +BROWSER_STOP_RECORDING_DESCRIPTION = f"""Stop recording the browser session. + +This tool stops the current recording session and saves any remaining events to disk. + +Output Location: {BROWSER_RECORDING_OUTPUT_DIR}/recording-/ +Format: Events are saved as numbered JSON files (1.json, 2.json, etc.) containing +rrweb event arrays. These files can be replayed using rrweb-player to visualize +the recorded session. + +Returns a summary message with the total event count, file count, and save directory. +""" + + +class BrowserStopRecordingTool( + ToolDefinition[BrowserStopRecordingAction, BrowserObservation] +): + """Tool for stopping browser session recording.""" + + @classmethod + def create(cls, executor: "BrowserToolExecutor") -> Sequence[Self]: + return [ + cls( + description=BROWSER_STOP_RECORDING_DESCRIPTION, + action_type=BrowserStopRecordingAction, + observation_type=BrowserObservation, + annotations=ToolAnnotations( + title="browser_stop_recording", + # Modifies state: stops recording, flushes events to disk + readOnlyHint=False, + destructiveHint=False, + idempotentHint=False, + openWorldHint=False, + ), + executor=executor, + ) + ] + + class BrowserToolSet(ToolDefinition[BrowserAction, BrowserObservation]): """A set of all browser tools. @@ -721,6 +822,8 @@ def create( BrowserCloseTabTool, BrowserGetStorageTool, BrowserSetStorageTool, + BrowserStartRecordingTool, + BrowserStopRecordingTool, ]: tools.extend(tool_class.create(executor)) return tools diff --git a/openhands-tools/openhands/tools/browser_use/event_storage.py b/openhands-tools/openhands/tools/browser_use/event_storage.py new file mode 100644 index 0000000000..c02192865c --- /dev/null +++ b/openhands-tools/openhands/tools/browser_use/event_storage.py @@ -0,0 +1,68 @@ +"""Persistent storage for browser recording events.""" + +from __future__ import annotations + +import json +import os +from dataclasses import dataclass, field +from datetime import UTC, datetime + +from openhands.sdk import get_logger + + +logger = get_logger(__name__) + + +@dataclass +class EventStorage: + """Handles persistent storage of recording events to disk.""" + + output_dir: str | None = None + _session_dir: str | None = field(default=None, repr=False) + _files_written: int = 0 + _total_events: int = 0 + + @property + def session_dir(self) -> str | None: + return self._session_dir + + @property + def file_count(self) -> int: + return self._files_written + + @property + def total_events(self) -> int: + return self._total_events + + def create_session_subfolder(self) -> str | None: + """Create a timestamped subfolder for this recording session.""" + if not self.output_dir: + return None + timestamp = datetime.now(UTC).strftime("%Y%m%d-%H%M%S-%f") + subfolder = os.path.join(self.output_dir, f"recording-{timestamp}") + os.makedirs(subfolder, exist_ok=True) + self._session_dir = subfolder + return subfolder + + def save_events(self, events: list[dict]) -> str | None: + """Save events to a timestamped JSON file.""" + if not self._session_dir or not events: + return None + + os.makedirs(self._session_dir, exist_ok=True) + timestamp = datetime.now(UTC).strftime("%Y%m%d-%H%M%S-%f") + filepath = os.path.join(self._session_dir, f"{timestamp}.json") + + with open(filepath, "w") as f: + json.dump(events, f) + + self._files_written += 1 + self._total_events += len(events) + logger.debug(f"Saved {len(events)} events to {filepath}") + return filepath + + def reset(self) -> None: + """Reset storage state for a new session.""" + self._session_dir = None + self._files_written = 0 + self._total_events = 0 diff --git a/openhands-tools/openhands/tools/browser_use/impl.py b/openhands-tools/openhands/tools/browser_use/impl.py index d657112db8..1488615559 100644 --- a/openhands-tools/openhands/tools/browser_use/impl.py +++ b/openhands-tools/openhands/tools/browser_use/impl.py @@ -1,12 +1,16 @@ """Browser tool executor implementation using browser-use MCP server wrapper.""" +from __future__ import annotations + +import functools import json import logging import os import shutil import subprocess +from collections.abc import Callable, Coroutine from pathlib import Path -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, TypeVar if TYPE_CHECKING: @@ -16,11 +20,64 @@ from openhands.sdk.tool import ToolExecutor from openhands.sdk.utils import sanitized_env from openhands.sdk.utils.async_executor import AsyncExecutor -from openhands.tools.browser_use.definition import BrowserAction, BrowserObservation +from openhands.tools.browser_use.definition import ( + BROWSER_RECORDING_OUTPUT_DIR, + BrowserAction, + BrowserObservation, +) from openhands.tools.browser_use.server import CustomBrowserUseServer from openhands.tools.utils.timeout import TimeoutError, run_with_timeout +F = TypeVar("F", bound=Callable[..., Coroutine[Any, Any, Any]]) + + +def recording_aware( + func: Callable[..., Coroutine[Any, Any, Any]], +) -> Callable[..., Coroutine[Any, Any, Any]]: + """Decorator that handles recording flush before/after navigation operations. + + This decorator: + 1. Flushes recording events before the operation (to preserve them) + 2. Executes the operation + 3. Restarts recording on the new page if recording was active + + Error Handling Policy (see recording.py module docstring for full details): + - Recording is a secondary feature that should never block browser operations + - AttributeError: silent pass (recording not initialized - expected) + - Other exceptions: log at DEBUG, don't interrupt navigation + """ + + @functools.wraps(func) + async def wrapper(self: BrowserToolExecutor, *args: Any, **kwargs: Any) -> Any: + is_recording = self._server._is_recording + if is_recording: + try: + await self._server._flush_recording_events() + except AttributeError: + # Recording not initialized - expected, silent pass + pass + except Exception as e: + # Internal operation: log at DEBUG, don't interrupt navigation + logger.debug(f"Recording flush before {func.__name__} skipped: {e}") + + result = await func(self, *args, **kwargs) + + if is_recording: + try: + await self._server._restart_recording_on_new_page() + except AttributeError: + # Recording not initialized - expected, silent pass + pass + except Exception as e: + # Internal operation: log at DEBUG, don't interrupt navigation + logger.debug(f"Recording restart after {func.__name__} skipped: {e}") + + return result + + return wrapper + + # Suppress browser-use logging for cleaner integration if DEBUG: logging.getLogger("browser_use").setLevel(logging.DEBUG) @@ -123,7 +180,8 @@ def check_chromium_available(self) -> str | None: for chromium_dir in chromium_dirs: # Check platform-specific paths possible_paths = [ - chromium_dir / "chrome-linux" / "chrome", # Linux + chromium_dir / "chrome-linux" / "chrome", # Linux (old) + chromium_dir / "chrome-linux64" / "chrome", # Linux (new) chromium_dir / "chrome-mac" / "Chromium.app" @@ -156,6 +214,7 @@ def __init__( session_timeout_minutes: int = 30, init_timeout_seconds: int = 30, full_output_save_dir: str | None = None, + inject_scripts: list[str] | None = None, **config, ): """Initialize BrowserToolExecutor with timeout protection. @@ -166,7 +225,11 @@ def __init__( session_timeout_minutes: Browser session timeout in minutes init_timeout_seconds: Timeout for browser initialization in seconds full_output_save_dir: Absolute path to directory to save full output - logs and files, used when truncation is needed. + logs and files, used when truncation is needed. + inject_scripts: List of JavaScript code strings to inject into every + new document. Scripts are injected via CDP's + Page.addScriptToEvaluateOnNewDocument and run before page scripts. + Useful for injecting recording tools like rrweb. **config: Additional configuration options """ @@ -180,6 +243,10 @@ def init_logic(): headless = False # Force headless off if VNC is enabled logger.info("VNC is enabled - running browser in non-headless mode") + # Configure scripts to inject + if inject_scripts: + self._server.set_inject_scripts(inject_scripts) + self._config = { "headless": headless, "allowed_domains": allowed_domains or [], @@ -202,7 +269,7 @@ def init_logic(): def __call__( self, action: BrowserAction, - conversation: "LocalConversation | None" = None, # noqa: ARG002 + conversation: LocalConversation | None = None, # noqa: ARG002 ): """Submit an action to run in the background loop and wait for result.""" return self._async_executor.run_async( @@ -223,6 +290,8 @@ async def _execute_action(self, action): BrowserObservation, BrowserScrollAction, BrowserSetStorageAction, + BrowserStartRecordingAction, + BrowserStopRecordingAction, BrowserSwitchTabAction, BrowserTypeAction, ) @@ -256,6 +325,10 @@ async def _execute_action(self, action): result = await self.switch_tab(action.tab_id) elif isinstance(action, BrowserCloseTabAction): result = await self.close_tab(action.tab_id) + elif isinstance(action, BrowserStartRecordingAction): + result = await self.start_recording() + elif isinstance(action, BrowserStopRecordingAction): + result = await self.stop_recording() else: error_msg = f"Unsupported action type: {type(action)}" return BrowserObservation.from_text( @@ -283,20 +356,26 @@ async def _ensure_initialized(self): if not self._initialized: # Initialize browser session with our config await self._server._init_browser_session(**self._config) + # Inject any configured user scripts after session is ready + # Note: rrweb scripts are injected lazily when recording starts + await self._server._inject_scripts_to_session() self._initialized = True # Navigation & Browser Control Methods + @recording_aware async def navigate(self, url: str, new_tab: bool = False) -> str: """Navigate to a URL.""" await self._ensure_initialized() return await self._server._navigate(url, new_tab) + @recording_aware async def go_back(self) -> str: """Go back in browser history.""" await self._ensure_initialized() return await self._server._go_back() # Page Interaction + @recording_aware async def click(self, index: int, new_tab: bool = False) -> str: """Click an element by index.""" await self._ensure_initialized() @@ -376,6 +455,29 @@ async def get_content(self, extract_links: bool, start_from_char: int) -> str: extract_links=extract_links, start_from_char=start_from_char ) + # Session Recording + async def start_recording(self) -> str: + """Start recording the browser session using rrweb. + + Recording events are periodically flushed to timestamped JSON files + in a session subfolder under BROWSER_RECORDING_OUTPUT_DIR. + Events are flushed every 5 seconds. + """ + await self._ensure_initialized() + return await self._server._start_recording( + output_dir=BROWSER_RECORDING_OUTPUT_DIR + ) + + async def stop_recording(self) -> str: + """Stop recording and save remaining events to file. + + Stops the periodic flush, collects any remaining events, and saves + them to a final numbered JSON file. Returns a summary message with + the total events and file count. + """ + await self._ensure_initialized() + return await self._server._stop_recording() + async def close_browser(self) -> str: """Close the browser session.""" if self._initialized: diff --git a/openhands-tools/openhands/tools/browser_use/js/flush-events.js b/openhands-tools/openhands/tools/browser_use/js/flush-events.js new file mode 100644 index 0000000000..85020931c1 --- /dev/null +++ b/openhands-tools/openhands/tools/browser_use/js/flush-events.js @@ -0,0 +1,6 @@ +(function() { + var events = window.__rrweb_events || []; + // Clear browser-side events after flushing + window.__rrweb_events = []; + return JSON.stringify({events: events}); +})(); diff --git a/openhands-tools/openhands/tools/browser_use/js/rrweb-loader.js b/openhands-tools/openhands/tools/browser_use/js/rrweb-loader.js new file mode 100644 index 0000000000..415cb4ebf7 --- /dev/null +++ b/openhands-tools/openhands/tools/browser_use/js/rrweb-loader.js @@ -0,0 +1,60 @@ +(function() { + if (window.__rrweb_loaded) return; + window.__rrweb_loaded = true; + + // Initialize storage for events (per-page, will be flushed to backend) + window.__rrweb_events = window.__rrweb_events || []; + // Flag to indicate if recording should auto-start on new pages (cross-page) + // This is ONLY set after explicit start_recording call, not on initial load + window.__rrweb_should_record = window.__rrweb_should_record || false; + // Flag to track if rrweb failed to load + window.__rrweb_load_failed = false; + + // Create a Promise that resolves when rrweb loads (event-driven waiting) + var resolveReady; + window.__rrweb_ready_promise = new Promise(function(resolve) { + resolveReady = resolve; + }); + + function loadRrweb() { + var s = document.createElement('script'); + s.src = '{{CDN_URL}}'; + s.onload = function() { + window.__rrweb_ready = true; + console.log('[rrweb] Loaded successfully from CDN'); + resolveReady({success: true}); + // Auto-start recording ONLY if flag is set (for cross-page continuity) + // This flag is only true after an explicit start_recording call + if (window.__rrweb_should_record && !window.__rrweb_stopFn) { + window.startRecordingInternal(); + } + }; + s.onerror = function() { + console.error('[rrweb] Failed to load from CDN'); + window.__rrweb_load_failed = true; + resolveReady({success: false, error: 'load_failed'}); + }; + (document.head || document.documentElement).appendChild(s); + } + + // Internal function to start recording (used for auto-start on navigation) + window.startRecordingInternal = function() { + var recordFn = (typeof rrweb !== 'undefined' && rrweb.record) || + (typeof rrwebRecord !== 'undefined' && rrwebRecord.record); + if (!recordFn || window.__rrweb_stopFn) return; + + window.__rrweb_events = []; + window.__rrweb_stopFn = recordFn({ + emit: function(event) { + window.__rrweb_events.push(event); + } + }); + console.log('[rrweb] Auto-started recording on new page'); + }; + + if (document.readyState === 'loading') { + document.addEventListener('DOMContentLoaded', loadRrweb); + } else { + loadRrweb(); + } +})(); diff --git a/openhands-tools/openhands/tools/browser_use/js/start-recording-simple.js b/openhands-tools/openhands/tools/browser_use/js/start-recording-simple.js new file mode 100644 index 0000000000..95ca7fe565 --- /dev/null +++ b/openhands-tools/openhands/tools/browser_use/js/start-recording-simple.js @@ -0,0 +1,14 @@ +(function() { + var recordFn = (typeof rrweb !== 'undefined' && rrweb.record) || + (typeof rrwebRecord !== 'undefined' && rrwebRecord.record); + if (!recordFn) return {status: 'not_loaded'}; + if (window.__rrweb_stopFn) return {status: 'already_recording'}; + + window.__rrweb_events = []; + window.__rrweb_stopFn = recordFn({ + emit: function(event) { + window.__rrweb_events.push(event); + } + }); + return {status: 'started'}; +})(); diff --git a/openhands-tools/openhands/tools/browser_use/js/start-recording.js b/openhands-tools/openhands/tools/browser_use/js/start-recording.js new file mode 100644 index 0000000000..c77f307829 --- /dev/null +++ b/openhands-tools/openhands/tools/browser_use/js/start-recording.js @@ -0,0 +1,17 @@ +(function() { + if (window.__rrweb_stopFn) return {status: 'already_recording'}; + // Check if rrweb failed to load from CDN + if (window.__rrweb_load_failed) return {status: 'load_failed'}; + // rrweb UMD module exports to window.rrweb (not rrwebRecord) + var recordFn = (typeof rrweb !== 'undefined' && rrweb.record) || + (typeof rrwebRecord !== 'undefined' && rrwebRecord.record); + if (!recordFn) return {status: 'not_loaded'}; + window.__rrweb_events = []; + window.__rrweb_should_record = true; + window.__rrweb_stopFn = recordFn({ + emit: function(event) { + window.__rrweb_events.push(event); + } + }); + return {status: 'started'}; +})(); diff --git a/openhands-tools/openhands/tools/browser_use/js/stop-recording.js b/openhands-tools/openhands/tools/browser_use/js/stop-recording.js new file mode 100644 index 0000000000..73da96c9a0 --- /dev/null +++ b/openhands-tools/openhands/tools/browser_use/js/stop-recording.js @@ -0,0 +1,15 @@ +(function() { + var events = window.__rrweb_events || []; + + // Stop the recording if active + if (window.__rrweb_stopFn) { + window.__rrweb_stopFn(); + window.__rrweb_stopFn = null; + } + + // Clear flags + window.__rrweb_should_record = false; + window.__rrweb_events = []; + + return JSON.stringify({events: events}); +})(); diff --git a/openhands-tools/openhands/tools/browser_use/js/wait-for-rrweb.js b/openhands-tools/openhands/tools/browser_use/js/wait-for-rrweb.js new file mode 100644 index 0000000000..86d415389a --- /dev/null +++ b/openhands-tools/openhands/tools/browser_use/js/wait-for-rrweb.js @@ -0,0 +1,16 @@ +(function() { + // If Promise doesn't exist, scripts weren't injected yet + if (!window.__rrweb_ready_promise) { + return Promise.resolve({success: false, error: 'not_injected'}); + } + // If already loaded, return immediately + if (window.__rrweb_ready) { + return Promise.resolve({success: true}); + } + // If already failed, return immediately + if (window.__rrweb_load_failed) { + return Promise.resolve({success: false, error: 'load_failed'}); + } + // Wait for the Promise to resolve + return window.__rrweb_ready_promise; +})(); diff --git a/openhands-tools/openhands/tools/browser_use/recording.py b/openhands-tools/openhands/tools/browser_use/recording.py new file mode 100644 index 0000000000..44ec8fd062 --- /dev/null +++ b/openhands-tools/openhands/tools/browser_use/recording.py @@ -0,0 +1,570 @@ +"""Recording session management for browser session recording using rrweb. + +Error Handling Policy +===================== +Recording is a secondary feature that should never block primary browser operations. +This module follows a consistent error handling strategy based on operation type: + +1. **User-facing operations** (start, stop): + - Return descriptive error strings to the user (prefixed with "Error:") + - Log at WARNING level for unexpected errors + - Log at INFO level for expected failures (e.g., rrweb load failures) + +2. **Internal/background operations** (flush_events, periodic flush, restart): + - Log at DEBUG level and continue silently + - Never raise exceptions that would interrupt browser operations + - Return neutral values (0, None) on failure + +3. **AttributeError for "not initialized"**: + - Silent pass - this is expected when recording hasn't been set up + - Used in the recording_aware decorator in impl.py + +This policy ensures that recording failures are observable through logs but never +disrupt the user's primary browser workflow. +""" + +from __future__ import annotations + +import asyncio +import json +from dataclasses import dataclass, field +from functools import lru_cache +from pathlib import Path +from typing import TYPE_CHECKING + +from openhands.sdk import get_logger +from openhands.tools.browser_use.event_storage import EventStorage + + +if TYPE_CHECKING: + from browser_use.browser.session import BrowserSession + + +logger = get_logger(__name__) + +# Directory containing JavaScript files +_JS_DIR = Path(__file__).parent / "js" + + +# ============================================================================= +# Configuration +# ============================================================================= + + +@dataclass +class RecordingConfig: + """Configuration for recording sessions. + + CDN Dependency Note: + The cdn_url points to unpkg.com which serves npm packages. If this CDN + is unavailable (down, blocked by firewall, or slow), recording will fail + to start. For production deployments in restricted environments, consider: + - Self-hosting the rrweb library + - Using a different CDN (jsdelivr, cdnjs) + - Bundling rrweb with your application + """ + + flush_interval_seconds: float = 5.0 + rrweb_load_timeout_ms: int = 10000 # Timeout for rrweb to load from CDN + cdn_url: str = "https://unpkg.com/rrweb@2.0.0-alpha.17/dist/rrweb.umd.cjs" + + +# Default configuration +DEFAULT_CONFIG = RecordingConfig() + + +# ============================================================================= +# JavaScript Code Loading +# ============================================================================= + + +@lru_cache(maxsize=16) +def _load_js_file(filename: str) -> str: + """Load a JavaScript file from the js/ directory with caching.""" + filepath = _JS_DIR / filename + return filepath.read_text() + + +def get_rrweb_loader_js(cdn_url: str) -> str: + """Generate the rrweb loader JavaScript with the specified CDN URL.""" + template = _load_js_file("rrweb-loader.js") + return template.replace("{{CDN_URL}}", cdn_url) + + +def _get_flush_events_js() -> str: + """Get the JavaScript to flush recording events from browser to Python.""" + return _load_js_file("flush-events.js") + + +def _get_start_recording_simple_js() -> str: + """Get the JavaScript to start recording on a page (simple version).""" + return _load_js_file("start-recording-simple.js") + + +def _get_start_recording_js() -> str: + """Get the JavaScript to start recording (full version with load failure check).""" + return _load_js_file("start-recording.js") + + +def _get_stop_recording_js() -> str: + """Get the JavaScript to stop recording and collect remaining events.""" + return _load_js_file("stop-recording.js") + + +def _get_wait_for_rrweb_js() -> str: + """Get the JavaScript to wait for rrweb to load using Promise.""" + return _load_js_file("wait-for-rrweb.js") + + +# ============================================================================= +# RecordingSession Class +# ============================================================================= + + +@dataclass +class RecordingSession: + """Manages browser session recording using rrweb. + + Concurrency: Uses asyncio.Lock to protect _events buffer from concurrent + access by the periodic flush loop and navigation flushes. + """ + + output_dir: str | None = None + config: RecordingConfig = field(default_factory=lambda: DEFAULT_CONFIG) + + _storage: EventStorage = field(default_factory=EventStorage, repr=False) + _is_recording: bool = False + _events: list[dict] = field(default_factory=list) + _flush_task: asyncio.Task | None = field(default=None, repr=False) + _scripts_injected: bool = False + _lock: asyncio.Lock = field(default_factory=asyncio.Lock, repr=False) + _consecutive_flush_failures: int = 0 + + def __post_init__(self) -> None: + # Sync output_dir to storage + self._storage.output_dir = self.output_dir + + @property + def session_dir(self) -> str | None: + return self._storage.session_dir + + @property + def is_active(self) -> bool: + return self._is_recording + + @property + def total_events(self) -> int: + return self._storage.total_events + + @property + def file_count(self) -> int: + return self._storage.file_count + + @property + def events(self) -> list[dict]: + return self._events + + def _save_and_clear_events(self) -> str | None: + """Save current events to storage and clear the buffer.""" + if not self._events: + return None + filepath = self._storage.save_events(self._events) + if filepath: + self._events = [] + return filepath + + async def _set_recording_flag( + self, browser_session: BrowserSession, should_record: bool + ) -> None: + """Set the recording flag in the browser for auto-start on new pages.""" + try: + cdp_session = await browser_session.get_or_create_cdp_session() + flag_value = str(should_record).lower() + await cdp_session.cdp_client.send.Runtime.evaluate( + params={ + "expression": f"window.__rrweb_should_record = {flag_value};", + "returnByValue": True, + }, + session_id=cdp_session.session_id, + ) + except Exception as e: + # Internal op: log at DEBUG, don't interrupt (see Error Handling Policy) + logger.debug(f"Failed to set recording flag: {e}") + + async def inject_scripts(self, browser_session: BrowserSession) -> list[str]: + """Inject rrweb loader script into the browser session. + + Uses Page.addScriptToEvaluateOnNewDocument to inject scripts that + will run on every new document before the page's scripts execute. + + Returns: + List of script identifiers returned by CDP. + """ + if self._scripts_injected: + return [] + + script_ids = [] + try: + cdp_session = await browser_session.get_or_create_cdp_session() + cdp_client = cdp_session.cdp_client + + rrweb_loader = get_rrweb_loader_js(self.config.cdn_url) + result = await cdp_client.send.Page.addScriptToEvaluateOnNewDocument( + params={"source": rrweb_loader, "runImmediately": True}, + session_id=cdp_session.session_id, + ) + script_id = result.get("identifier") + if script_id: + script_ids.append(script_id) + logger.debug(f"Injected rrweb script with identifier: {script_id}") + + self._scripts_injected = True + logger.debug("Injected rrweb loader script") + except Exception as e: + # Internal op: log at DEBUG, don't interrupt (see Error Handling Policy) + logger.debug(f"Script injection skipped: {e}") + + return script_ids + + async def flush_events(self, browser_session: BrowserSession) -> int: + """Flush recording events from browser to Python storage.""" + if not self._is_recording: + return 0 + + try: + cdp_session = await browser_session.get_or_create_cdp_session() + result = await cdp_session.cdp_client.send.Runtime.evaluate( + params={"expression": _get_flush_events_js(), "returnByValue": True}, + session_id=cdp_session.session_id, + ) + + data = json.loads(result.get("result", {}).get("value", "{}")) + events = data.get("events", []) + if events: + async with self._lock: + self._events.extend(events) + logger.debug(f"Flushed {len(events)} events from browser") + + return len(events) + except Exception as e: + # Internal op: log at DEBUG, return 0 (see Error Handling Policy) + logger.debug(f"Event flush skipped: {e}") + return 0 + + async def _periodic_flush_loop(self, browser_session: BrowserSession) -> None: + """Background task that periodically flushes recording events.""" + while self._is_recording: + await asyncio.sleep(self.config.flush_interval_seconds) + if not self._is_recording: + break + + try: + await self.flush_events(browser_session) + async with self._lock: + if self._events: + filepath = self._save_and_clear_events() + if filepath: + self._consecutive_flush_failures = 0 + else: + self._consecutive_flush_failures += 1 + except Exception as e: + # Internal op: log at DEBUG, don't interrupt (see Error Handling Policy) + self._consecutive_flush_failures += 1 + logger.debug(f"Periodic flush skipped: {e}") + + # Warn after 3 consecutive failures for visibility into persistent issues + if self._consecutive_flush_failures >= 3: + logger.warning( + f"Recording flush has failed {self._consecutive_flush_failures} " + f"times. Events may be accumulating in memory. " + f"Check disk space and permissions." + ) + + async def _wait_for_rrweb_load(self, browser_session: BrowserSession) -> dict: + """Wait for rrweb to load using event-driven Promise-based waiting. + + Uses CDP's awaitPromise to wait for the rrweb loader Promise to resolve, + avoiding polling anti-patterns. This waits exactly as long as needed + and fails immediately if loading fails. + + Returns: + Dict with 'success' (bool) and optionally 'error' (str) keys. + """ + cdp_session = await browser_session.get_or_create_cdp_session() + + try: + result = await asyncio.wait_for( + cdp_session.cdp_client.send.Runtime.evaluate( + params={ + "expression": _get_wait_for_rrweb_js(), + "awaitPromise": True, + "returnByValue": True, + }, + session_id=cdp_session.session_id, + ), + timeout=self.config.rrweb_load_timeout_ms / 1000, + ) + + value = result.get("result", {}).get("value", {}) + if isinstance(value, dict): + return value + return {"success": False, "error": "unexpected_response"} + + except TimeoutError: + logger.debug(f"rrweb load timeout ({self.config.rrweb_load_timeout_ms}ms)") + return {"success": False, "error": "timeout"} + + def _initialize_session_state(self) -> None: + """Reset state and create session subfolder for a new recording session.""" + self._events = [] + self._is_recording = True + self._consecutive_flush_failures = 0 + self._storage.reset() + self._storage.output_dir = self.output_dir + self._storage.create_session_subfolder() + + async def _handle_rrweb_load_failure( + self, browser_session: BrowserSession, error: str + ) -> str: + """Handle rrweb load failure and return appropriate error message. + + Expected failure: log at INFO, return error string (see Error Handling Policy) + """ + self._is_recording = False + await self._set_recording_flag(browser_session, False) + + error_messages = { + "load_failed": ( + "Error: Unable to start recording. The rrweb library " + "failed to load from CDN. Please check network " + "connectivity and try again." + ), + "timeout": ( + "Error: Unable to start recording. rrweb did not load in time. " + "Please navigate to a page first and try again." + ), + "not_injected": ( + "Error: Unable to start recording. Scripts not injected. " + "Please navigate to a page first and try again." + ), + } + + if error in error_messages: + if error == "timeout": + logger.info( + f"Recording start failed: rrweb load timeout " + f"({self.config.rrweb_load_timeout_ms}ms)" + ) + else: + logger.info(f"Recording start failed: rrweb {error}") + return error_messages[error] + + logger.info(f"Recording start failed: {error}") + return f"Error: Unable to start recording: {error}" + + async def _ensure_rrweb_loaded(self, browser_session: BrowserSession) -> str | None: + """Wait for rrweb to load. Returns error message if failed, None on success.""" + load_result = await self._wait_for_rrweb_load(browser_session) + + if not load_result.get("success"): + error = load_result.get("error", "unknown") + return await self._handle_rrweb_load_failure(browser_session, error) + + return None + + async def _start_flush_task(self, browser_session: BrowserSession) -> None: + """Start the periodic flush task if not already running.""" + if not self._flush_task: + self._flush_task = asyncio.create_task( + self._periodic_flush_loop(browser_session) + ) + + async def _execute_start_recording(self, browser_session: BrowserSession) -> str: + """Execute the start recording JS and handle the result status.""" + cdp_session = await browser_session.get_or_create_cdp_session() + + result = await cdp_session.cdp_client.send.Runtime.evaluate( + params={"expression": _get_start_recording_js(), "returnByValue": True}, + session_id=cdp_session.session_id, + ) + + value = result.get("result", {}).get("value", {}) + status = value.get("status") if isinstance(value, dict) else value + + if status == "started": + await self._set_recording_flag(browser_session, True) + await self._start_flush_task(browser_session) + logger.info("Recording started") + return "Recording started" + + if status == "already_recording": + await self._set_recording_flag(browser_session, True) + await self._start_flush_task(browser_session) + logger.debug("Recording already active") + return "Already recording" + + if status == "load_failed": + return await self._handle_rrweb_load_failure(browser_session, "load_failed") + + self._is_recording = False + logger.info(f"Recording start failed: unknown status '{status}'") + return f"Unknown status: {status}" + + async def start(self, browser_session: BrowserSession) -> str: + """Start rrweb session recording. + + Uses event-driven Promise-based waiting for rrweb to load, avoiding + polling anti-patterns. This waits exactly as long as needed and fails + immediately if loading fails. + + Each recording session creates a new timestamped subfolder under output_dir + to ensure multiple start/stop cycles don't mix events. + + Returns: + Status message indicating success or failure. + + Note: + User-facing operation: returns error strings, logs at WARNING for + unexpected errors (see Error Handling Policy in module docstring). + """ + if not self._scripts_injected: + await self.inject_scripts(browser_session) + + self._initialize_session_state() + + try: + error_msg = await self._ensure_rrweb_loaded(browser_session) + if error_msg: + return error_msg + + return await self._execute_start_recording(browser_session) + + except Exception as e: + # User-facing operation: log at WARNING, return error string + self._is_recording = False + logger.warning(f"Recording start failed: {e}") + return f"Error starting recording: {str(e)}" + + async def stop(self, browser_session: BrowserSession) -> str: + """Stop rrweb recording and save remaining events. + + Stops the periodic flush task, collects any remaining events from the + browser, and saves them to a final numbered JSON file. + + Returns: + A summary message with the save directory and file count. + + Note: + User-facing operation: returns error strings, logs at WARNING for + unexpected errors (see Error Handling Policy in module docstring). + """ + if not self._is_recording: + return "Error: Not recording. Call browser_start_recording first." + + try: + # Stop the periodic flush task first + self._is_recording = False + if self._flush_task: + self._flush_task.cancel() + try: + await self._flush_task + except (asyncio.CancelledError, Exception): + pass + self._flush_task = None + + cdp_session = await browser_session.get_or_create_cdp_session() + + # Stop recording on current page and get remaining events + result = await cdp_session.cdp_client.send.Runtime.evaluate( + params={"expression": _get_stop_recording_js(), "returnByValue": True}, + session_id=cdp_session.session_id, + ) + + current_page_data = json.loads(result.get("result", {}).get("value", "{}")) + current_page_events = current_page_data.get("events", []) + + async with self._lock: + if current_page_events: + self._events.extend(current_page_events) + if self._events: + self._save_and_clear_events() + total_events = self._storage.total_events + total_files = self._storage.file_count + + await self._set_recording_flag(browser_session, False) + session_dir_used = self._storage.session_dir + + logger.info( + f"Recording stopped: {total_events} events saved to " + f"{total_files} file(s) in {session_dir_used}" + ) + + summary = ( + f"Recording stopped. Captured {total_events} events " + f"in {total_files} file(s)." + ) + if session_dir_used: + summary += f" Saved to: {session_dir_used}" + + return summary + + except Exception as e: + # User-facing operation: log at WARNING, return error string + self._is_recording = False + if self._flush_task: + self._flush_task.cancel() + self._flush_task = None + logger.warning(f"Recording stop failed: {e}") + return f"Error stopping recording: {str(e)}" + + async def restart_on_new_page(self, browser_session: BrowserSession) -> None: + """Restart recording on a new page after navigation. + + Uses event-driven Promise-based waiting for rrweb to be ready, + then starts a new recording session. Called automatically after + navigation when recording is active. + + Note: + Internal operation: logs at DEBUG, never raises + (see Error Handling Policy in module docstring). + """ + if not self._is_recording: + return + + try: + load_result = await self._wait_for_rrweb_load(browser_session) + + if not load_result.get("success"): + error = load_result.get("error", "unknown") + logger.debug(f"Recording restart skipped: rrweb {error}") + return + + cdp_session = await browser_session.get_or_create_cdp_session() + result = await cdp_session.cdp_client.send.Runtime.evaluate( + params={ + "expression": _get_start_recording_simple_js(), + "returnByValue": True, + }, + session_id=cdp_session.session_id, + ) + + value = result.get("result", {}).get("value", {}) + status = value.get("status") if isinstance(value, dict) else value + + if status == "started": + logger.debug("Recording restarted on new page") + elif status == "already_recording": + logger.debug("Recording already active on new page") + else: + logger.debug(f"Recording restart: unexpected status '{status}'") + + except Exception as e: + # Internal op: log at DEBUG, don't interrupt (see Error Handling Policy) + logger.debug(f"Recording restart skipped: {e}") + + def reset(self) -> None: + """Reset the recording session state for reuse.""" + self._events = [] + self._is_recording = False + self._storage.reset() + self._flush_task = None diff --git a/openhands-tools/openhands/tools/browser_use/server.py b/openhands-tools/openhands/tools/browser_use/server.py index 82fa2dbd91..9a1c25b18f 100644 --- a/openhands-tools/openhands/tools/browser_use/server.py +++ b/openhands-tools/openhands/tools/browser_use/server.py @@ -2,17 +2,167 @@ from openhands.sdk import get_logger from openhands.tools.browser_use.logging_fix import LogSafeBrowserUseServer +from openhands.tools.browser_use.recording import RecordingSession logger = get_logger(__name__) +# ============================================================================= +# CustomBrowserUseServer Class +# ============================================================================= + + class CustomBrowserUseServer(LogSafeBrowserUseServer): """ Custom BrowserUseServer with a new tool for extracting web page's content in markdown. """ + def __init__(self, session_timeout_minutes: int = 10): + super().__init__(session_timeout_minutes=session_timeout_minutes) + # Scripts to inject into every new document (before page scripts run) + self._inject_scripts: list[str] = [] + # Script identifiers returned by CDP (for cleanup if needed) + self._injected_script_ids: list[str] = [] + # Recording session - encapsulates all recording state and logic + self._recording_session: RecordingSession | None = None + + @property + def _is_recording(self) -> bool: + """Check if recording is currently active.""" + return self._recording_session is not None and self._recording_session.is_active + + async def _cleanup_recording(self) -> None: + """Cleanup recording session resources. + + Stops any active recording, saves remaining events, and releases resources. + Should be called when the browser session is being closed. + """ + if self._recording_session is None: + return + + try: + # Stop recording if active to save any remaining events + if self._recording_session.is_active and self.browser_session: + await self._recording_session.stop(self.browser_session) + else: + # Just reset if not active or no browser session + self._recording_session.reset() + except Exception as e: + logger.debug(f"Recording cleanup error (non-fatal): {e}") + finally: + self._recording_session = None + + async def _close_browser(self) -> str: + """Close the browser session and cleanup recording resources.""" + await self._cleanup_recording() + return await super()._close_browser() + + async def _close_session(self, session_id: str) -> str: + """Close a specific browser session and cleanup recording if needed.""" + # Cleanup recording if closing the current session + if self.browser_session and self.browser_session.id == session_id: + await self._cleanup_recording() + return await super()._close_session(session_id) + + async def _close_all_sessions(self) -> str: + """Close all active browser sessions and cleanup recording resources.""" + await self._cleanup_recording() + return await super()._close_all_sessions() + + def set_inject_scripts(self, scripts: list[str]) -> None: + """Set scripts to be injected into every new document. + + Args: + scripts: List of JavaScript code strings to inject. + Each script will be evaluated before page scripts run. + """ + self._inject_scripts = scripts + + async def _inject_scripts_to_session(self) -> None: + """Inject configured user scripts into the browser session using CDP. + + Uses Page.addScriptToEvaluateOnNewDocument to inject scripts that + will run on every new document before the page's scripts execute. + Note: rrweb scripts are injected lazily when recording starts. + """ + if not self.browser_session or not self._inject_scripts: + return + + try: + cdp_session = await self.browser_session.get_or_create_cdp_session() + cdp_client = cdp_session.cdp_client + + for script in self._inject_scripts: + result = await cdp_client.send.Page.addScriptToEvaluateOnNewDocument( + params={"source": script, "runImmediately": True}, + session_id=cdp_session.session_id, + ) + script_id = result.get("identifier") + if script_id: + self._injected_script_ids.append(script_id) + logger.debug(f"Injected script with identifier: {script_id}") + + num_scripts = len(self._inject_scripts) + logger.info(f"Injected {num_scripts} user script(s) into browser session") + except Exception as e: + logger.warning(f"Failed to inject scripts: {e}") + + async def _flush_recording_events(self) -> int: + """Flush recording events from browser to Python storage. + + Returns the number of events flushed. + """ + if not self.browser_session or not self._recording_session: + return 0 + return await self._recording_session.flush_events(self.browser_session) + + async def _restart_recording_on_new_page(self) -> None: + """Restart recording on a new page after navigation.""" + if not self.browser_session or not self._recording_session: + return + await self._recording_session.restart_on_new_page(self.browser_session) + + async def _start_recording(self, output_dir: str | None = None) -> str: + """Start rrweb session recording. + + Recording persists across page navigations - events are periodically flushed + to timestamped JSON files in a session subfolder. + + Each recording session creates a new subfolder under output_dir with format: + {output_dir}/recording-{timestamp}/ + + Args: + output_dir: Root directory for recording files. If provided, a timestamped + subfolder will be created for this recording session. + """ + if not self.browser_session: + return "Error: No browser session active" + + # Create a new recording session with output_dir + self._recording_session = RecordingSession(output_dir=output_dir) + return await self._recording_session.start(self.browser_session) + + async def _stop_recording(self) -> str: + """Stop rrweb recording and save remaining events. + + Events are saved to the directory configured at start_recording time. + + Returns: + A summary message with the save directory and file count. + """ + if not self.browser_session: + return "Error: No browser session active" + + if not self._recording_session or not self._recording_session.is_active: + return "Error: Not recording. Call browser_start_recording first." + + result = await self._recording_session.stop(self.browser_session) + # Reset the session after stopping + self._recording_session.reset() + return result + async def _get_storage(self) -> str: """Get browser storage (cookies, local storage, session storage).""" import json diff --git a/tests/tools/browser_use/conftest.py b/tests/tools/browser_use/conftest.py index d30f51faf6..bda43149cc 100644 --- a/tests/tools/browser_use/conftest.py +++ b/tests/tools/browser_use/conftest.py @@ -14,6 +14,7 @@ def mock_browser_server(): """Create a mock CustomBrowserUseServer.""" server = MagicMock() server._init_browser_session = AsyncMock() + server._inject_scripts_to_session = AsyncMock() return server diff --git a/tests/tools/browser_use/test_browser_executor.py b/tests/tools/browser_use/test_browser_executor.py index 25377b26da..342e350ce9 100644 --- a/tests/tools/browser_use/test_browser_executor.py +++ b/tests/tools/browser_use/test_browser_executor.py @@ -140,3 +140,123 @@ async def test_browser_executor_initialization_idempotent(mock_browser_executor) # Should only be called once assert mock_browser_executor._server._init_browser_session.call_count == 1 + + +async def test_start_recording_initializes_session(mock_browser_executor): + """Test that start_recording initializes a recording session with correct state.""" + import tempfile + from unittest.mock import AsyncMock + + from openhands.tools.browser_use.recording import RecordingSession + + # Set up mock CDP session that simulates successful rrweb loading + mock_cdp_session = AsyncMock() + mock_cdp_session.session_id = "test-session" + mock_cdp_session.cdp_client.send.Runtime.evaluate = AsyncMock( + side_effect=[ + # First call: wait for rrweb load (returns success) + {"result": {"value": {"success": True}}}, + # Second call: start recording (returns started) + {"result": {"value": {"status": "started"}}}, + ] + ) + mock_cdp_session.cdp_client.send.Page.addScriptToEvaluateOnNewDocument = AsyncMock( + return_value={"identifier": "script-1"} + ) + + mock_browser_session = AsyncMock() + mock_browser_session.get_or_create_cdp_session = AsyncMock( + return_value=mock_cdp_session + ) + + with tempfile.TemporaryDirectory() as temp_dir: + # Create a real RecordingSession and test its behavior + # Use output_dir - start() will create a timestamped subfolder + session = RecordingSession(output_dir=temp_dir) + result = await session.start(mock_browser_session) + + # Verify the session state was properly initialized + assert session.is_active is True + assert result == "Recording started" + assert session._scripts_injected is True + # Verify a timestamped subfolder was created + assert session.session_dir is not None + assert session.session_dir.startswith(temp_dir) + assert "recording-" in session.session_dir + + +async def test_stop_recording_returns_summary_with_event_counts(): + """Test that stop_recording returns accurate summary with event counts.""" + import json + import os + import tempfile + from unittest.mock import AsyncMock + + from openhands.tools.browser_use.recording import RecordingSession + + with tempfile.TemporaryDirectory() as temp_dir: + # Create a recording session in RECORDING state with some events + session = RecordingSession() + session._storage._session_dir = temp_dir + session._is_recording = True + session._scripts_injected = True + + # Pre-populate the event buffer with some events + test_events = [{"type": 3, "timestamp": i, "data": {}} for i in range(25)] + session._events.extend(test_events) + + # Set up mock CDP session for stop + mock_cdp_session = AsyncMock() + mock_cdp_session.session_id = "test-session" + # Return additional events from the browser when stopping + mock_cdp_session.cdp_client.send.Runtime.evaluate = AsyncMock( + return_value={ + "result": { + "value": json.dumps( + {"events": [{"type": 3, "timestamp": 100, "data": {}}] * 17} + ) + } + } + ) + + mock_browser_session = AsyncMock() + mock_browser_session.get_or_create_cdp_session = AsyncMock( + return_value=mock_cdp_session + ) + + # Stop recording + result = await session.stop(mock_browser_session) + + # Verify the summary contains accurate counts + assert "Recording stopped" in result + assert "42 events" in result # 25 buffered + 17 from browser + assert "1 file(s)" in result + assert temp_dir in result + + # Verify state transition + assert session.is_active is False + + # Verify file was actually created with correct content + files = os.listdir(temp_dir) + assert len(files) == 1 + with open(os.path.join(temp_dir, files[0])) as f: + saved_events = json.load(f) + assert len(saved_events) == 42 + + +async def test_stop_recording_without_active_session_returns_error(): + """Test that stop_recording returns error when not recording.""" + from unittest.mock import AsyncMock + + from openhands.tools.browser_use.recording import RecordingSession + + # Create a session that's not recording + session = RecordingSession() + assert session.is_active is False + + mock_browser_session = AsyncMock() + + result = await session.stop(mock_browser_session) + + assert "Error" in result + assert "Not recording" in result diff --git a/tests/tools/browser_use/test_browser_executor_e2e.py b/tests/tools/browser_use/test_browser_executor_e2e.py index e3d736570f..d8a140b029 100644 --- a/tests/tools/browser_use/test_browser_executor_e2e.py +++ b/tests/tools/browser_use/test_browser_executor_e2e.py @@ -1,3 +1,4 @@ +import json import os import subprocess import tempfile @@ -18,6 +19,8 @@ BrowserObservation, BrowserScrollAction, BrowserSetStorageAction, + BrowserStartRecordingAction, + BrowserStopRecordingAction, BrowserSwitchTabAction, BrowserTypeAction, ) @@ -656,3 +659,182 @@ def test_save_screenshot(self, test_server: str): executor.close() except Exception: pass + + def test_start_recording( + self, browser_executor: BrowserToolExecutor, test_server: str + ): + """Test starting a recording session.""" + # Navigate to the test page first + navigate_action = BrowserNavigateAction(url=test_server) + browser_executor(navigate_action) + + # Start recording - now includes automatic retry + result = browser_executor(BrowserStartRecordingAction()) + + assert isinstance(result, BrowserObservation) + assert not result.is_error + assert "Recording started" in result.text + + def test_stop_recording_without_start( + self, browser_executor: BrowserToolExecutor, test_server: str + ): + """Test stopping recording when not started returns appropriate message.""" + # Navigate to the test page + navigate_action = BrowserNavigateAction(url=test_server) + browser_executor(navigate_action) + + # Wait for page to load + time.sleep(1) + + # Try to stop recording without starting + stop_action = BrowserStopRecordingAction() + result = browser_executor(stop_action) + + assert isinstance(result, BrowserObservation) + # Should return error indicating not recording + assert "Error" in result.text or "Not recording" in result.text + + def test_recording_captures_events( + self, browser_executor: BrowserToolExecutor, test_server: str + ): + """Test that recording captures browser events.""" + # Navigate to the test page + navigate_action = BrowserNavigateAction(url=test_server) + browser_executor(navigate_action) + + # Start recording - now includes automatic retry + start_result = browser_executor(BrowserStartRecordingAction()) + + assert start_result is not None + assert not start_result.is_error + assert "Recording started" in start_result.text + + # Perform some actions that should be recorded + browser_executor(BrowserScrollAction(direction="down")) + time.sleep(0.5) + browser_executor(BrowserScrollAction(direction="up")) + time.sleep(0.5) + + # Stop recording - now returns a summary message instead of JSON + stop_result = browser_executor(BrowserStopRecordingAction()) + + assert isinstance(stop_result, BrowserObservation) + assert not stop_result.is_error + + # Verify the summary message contains expected information + assert "Recording stopped" in stop_result.text + assert "events" in stop_result.text.lower() + assert "file" in stop_result.text.lower() + + # Print result for debugging + print(f"\n✓ Stop recording result: {stop_result.text}") + + def test_recording_save_to_file(self, test_server: str): + """Test that recording is saved to files in a timestamped subfolder. + + Note: Recording output goes to BROWSER_RECORDING_OUTPUT_DIR + (.agent_tmp/browser_observations/) regardless of full_output_save_dir. + """ + from openhands.tools.browser_use.definition import ( + BROWSER_RECORDING_OUTPUT_DIR, + ) + + executor = None + browser_initialized = False + try: + executor = BrowserToolExecutor( + headless=True, + session_timeout_minutes=5, + ) + + # Navigate to the test page + navigate_action = BrowserNavigateAction(url=test_server) + nav_result = executor(navigate_action) + + # Skip test if browser failed to initialize (infrastructure issue) + if nav_result.is_error or "Error" in nav_result.text: + pytest.skip(f"Browser initialization failed: {nav_result.text}") + + # Browser successfully initialized + browser_initialized = True + + # Start recording - now includes automatic retry + start_result = executor(BrowserStartRecordingAction()) + + assert start_result is not None + + # Skip test if recording couldn't start due to CDP issues + if "Error" in start_result.text or "not initialized" in start_result.text: + pytest.skip( + f"Recording could not start due to CDP issues: {start_result.text}" + ) + + assert "Recording started" in start_result.text, ( + f"Failed to start recording: {start_result.text}" + ) + + # Perform actions + executor(BrowserScrollAction(direction="down")) + time.sleep(0.5) + + # Stop recording - events are automatically saved to files + stop_result = executor(BrowserStopRecordingAction()) + assert not stop_result.is_error + + # Verify the summary message + assert "Recording stopped" in stop_result.text + assert "events" in stop_result.text.lower() + + # Verify a timestamped subfolder was created in the recording output dir + if os.path.exists(BROWSER_RECORDING_OUTPUT_DIR): + subdirs = [ + d + for d in os.listdir(BROWSER_RECORDING_OUTPUT_DIR) + if os.path.isdir(os.path.join(BROWSER_RECORDING_OUTPUT_DIR, d)) + and d.startswith("recording-") + ] + assert len(subdirs) >= 1, ( + f"Expected at least one recording subfolder in " + f"{BROWSER_RECORDING_OUTPUT_DIR}, got {subdirs}" + ) + + # Verify files were created in the most recent recording subfolder + # Sort by name (timestamp-based) to get the most recent + subdirs.sort(reverse=True) + recording_dir = os.path.join(BROWSER_RECORDING_OUTPUT_DIR, subdirs[0]) + files = os.listdir(recording_dir) + json_files = [f for f in files if f.endswith(".json")] + assert len(json_files) > 0, ( + "Expected at least one JSON file to be created" + ) + + # Read and verify the saved file(s) + total_events = 0 + for json_file in json_files: + filepath = os.path.join(recording_dir, json_file) + assert os.path.getsize(filepath) > 0 + with open(filepath) as f: + events = json.load(f) + assert isinstance(events, list) + total_events += len(events) + + assert total_events > 0, "Expected at least some events to be saved" + + print(f"\n✓ Recording saved to {recording_dir}") + print(f"✓ Created {len(json_files)} file(s)") + print(f"✓ Total events: {total_events}") + else: + # Directory doesn't exist - skip as the test cannot verify + pytest.skip( + f"Recording directory {BROWSER_RECORDING_OUTPUT_DIR} does not exist" + ) + + finally: + # Only attempt to close if browser was successfully initialized, + # as closing a broken session can hang indefinitely + if executor and browser_initialized: + try: + executor.close() + except Exception as e: + # Ignore errors during cleanup but log for debugging purposes + print(f"Warning: failed to close BrowserToolExecutor cleanly: {e}") diff --git a/tests/tools/browser_use/test_browser_toolset.py b/tests/tools/browser_use/test_browser_toolset.py index 24151b4912..00f15724fc 100644 --- a/tests/tools/browser_use/test_browser_toolset.py +++ b/tests/tools/browser_use/test_browser_toolset.py @@ -32,7 +32,7 @@ def test_browser_toolset_create_returns_list(): tools = BrowserToolSet.create(conv_state=conv_state) assert isinstance(tools, list) - assert len(tools) == 12 # All browser tools + assert len(tools) == 14 # All browser tools (including recording tools) # Verify all items are Tool instances for tool in tools: @@ -62,6 +62,8 @@ def test_browser_toolset_create_includes_all_browser_tools(): "browser_close_tab", "browser_get_storage", "browser_set_storage", + "browser_start_recording", + "browser_stop_recording", ] # Verify all expected tools are present diff --git a/tests/tools/browser_use/test_recording_flush.py b/tests/tools/browser_use/test_recording_flush.py new file mode 100644 index 0000000000..5d6c744e6e --- /dev/null +++ b/tests/tools/browser_use/test_recording_flush.py @@ -0,0 +1,460 @@ +"""Tests for browser session recording flush behavior. + +These tests verify that: +1. Recording events are periodically flushed to new file chunks +""" + +import asyncio +import json +import os +import tempfile +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from openhands.tools.browser_use.event_storage import EventStorage +from openhands.tools.browser_use.recording import ( + DEFAULT_CONFIG, + RecordingSession, +) +from openhands.tools.browser_use.server import CustomBrowserUseServer + + +# Get default config values for tests +RECORDING_FLUSH_INTERVAL_SECONDS = DEFAULT_CONFIG.flush_interval_seconds + + +@pytest.fixture +def mock_cdp_session(): + """Create a mock CDP session.""" + cdp_session = MagicMock() + cdp_session.session_id = "test-session-id" + cdp_session.cdp_client = MagicMock() + cdp_session.cdp_client.send = MagicMock() + cdp_session.cdp_client.send.Runtime = MagicMock() + cdp_session.cdp_client.send.Runtime.evaluate = AsyncMock() + return cdp_session + + +@pytest.fixture +def mock_browser_session(mock_cdp_session): + """Create a mock browser session.""" + browser_session = MagicMock() + browser_session.get_or_create_cdp_session = AsyncMock(return_value=mock_cdp_session) + return browser_session + + +@pytest.fixture +def server_with_mock_browser(mock_browser_session): + """Create a CustomBrowserUseServer with mocked browser session.""" + server = CustomBrowserUseServer() + server.browser_session = mock_browser_session + return server + + +@pytest.fixture +def recording_session_with_mock_browser(mock_browser_session): + """Create a RecordingSession with mocked browser session.""" + return mock_browser_session, RecordingSession() + + +def create_mock_events(count: int, size_per_event: int = 100) -> list[dict]: + """Create mock rrweb events with specified count and approximate size.""" + events = [] + for i in range(count): + # Create event with padding to reach approximate size + padding = "x" * max(0, size_per_event - 50) + events.append( + { + "type": 3, + "timestamp": 1000 + i, + "data": {"source": 1, "text": padding}, + } + ) + return events + + +class TestEventStorage: + """Tests for EventStorage - no browser mocks needed.""" + + def test_save_events_creates_file(self): + """Test that save_events creates a JSON file with events.""" + with tempfile.TemporaryDirectory() as temp_dir: + storage = EventStorage(output_dir=temp_dir) + storage.create_session_subfolder() + + events = create_mock_events(10) + filepath = storage.save_events(events) + + assert filepath is not None + assert os.path.exists(filepath) + with open(filepath) as f: + saved = json.load(f) + assert len(saved) == 10 + + def test_save_events_updates_counters(self): + """Test that save_events updates file_count and total_events.""" + with tempfile.TemporaryDirectory() as temp_dir: + storage = EventStorage(output_dir=temp_dir) + storage.create_session_subfolder() + + storage.save_events(create_mock_events(5)) + assert storage.file_count == 1 + assert storage.total_events == 5 + + storage.save_events(create_mock_events(10)) + assert storage.file_count == 2 + assert storage.total_events == 15 + + def test_save_events_returns_none_without_session_dir(self): + """Test that save_events returns None if no session_dir is set.""" + storage = EventStorage() + result = storage.save_events(create_mock_events(5)) + assert result is None + + def test_save_events_returns_none_for_empty_events(self): + """Test that save_events returns None for empty event list.""" + with tempfile.TemporaryDirectory() as temp_dir: + storage = EventStorage(output_dir=temp_dir) + storage.create_session_subfolder() + result = storage.save_events([]) + assert result is None + + def test_reset_clears_state(self): + """Test that reset clears all storage state.""" + with tempfile.TemporaryDirectory() as temp_dir: + storage = EventStorage(output_dir=temp_dir) + storage.create_session_subfolder() + storage.save_events(create_mock_events(5)) + + assert storage.session_dir is not None + assert storage.file_count == 1 + + storage.reset() + + assert storage.session_dir is None + assert storage.file_count == 0 + assert storage.total_events == 0 + + +class TestPeriodicFlush: + """Tests for periodic flush behavior (every few seconds).""" + + @pytest.mark.asyncio + async def test_periodic_flush_creates_new_file_chunks( + self, mock_browser_session, mock_cdp_session + ): + """Test that periodic flush creates new file chunks every few seconds.""" + from openhands.tools.browser_use.recording import RecordingConfig + + with tempfile.TemporaryDirectory() as temp_dir: + # Create recording session with fast flush interval + config = RecordingConfig(flush_interval_seconds=0.1) # 100ms + session = RecordingSession(config=config) + session._storage._session_dir = temp_dir + session._is_recording = True + + # Mock the CDP evaluate to return events on each flush + flush_call_count = 0 + + async def mock_evaluate(*args, **kwargs): + nonlocal flush_call_count + expression = kwargs.get("params", {}).get("expression", "") + + # Return events for flush calls + if ( + "window.__rrweb_events" in expression + and "JSON.stringify" in expression + ): + flush_call_count += 1 + events = create_mock_events(10) # 10 events per flush + return {"result": {"value": json.dumps({"events": events})}} + return {"result": {"value": None}} + + mock_cdp_session.cdp_client.send.Runtime.evaluate = AsyncMock( + side_effect=mock_evaluate + ) + + # Start the periodic flush task + flush_task = asyncio.create_task( + session._periodic_flush_loop(mock_browser_session) + ) + + # Let it run for enough time to create multiple flushes + await asyncio.sleep(0.35) # Should allow ~3 flush cycles + + # Stop recording to end the task + session._is_recording = False + await asyncio.sleep(0.15) # Allow task to exit + + # Cancel if still running + if not flush_task.done(): + flush_task.cancel() + try: + await flush_task + except asyncio.CancelledError: + pass + + # Verify: Multiple files should have been created + files = sorted(os.listdir(temp_dir)) + json_files = [f for f in files if f.endswith(".json")] + + assert len(json_files) >= 2, ( + f"Expected at least 2 file chunks from periodic flush, " + f"got {len(json_files)}: {json_files}" + ) + + # Verify each file contains valid events + for json_file in json_files: + filepath = os.path.join(temp_dir, json_file) + with open(filepath) as f: + events = json.load(f) + assert isinstance(events, list) + assert len(events) > 0 + + @pytest.mark.asyncio + async def test_periodic_flush_interval_is_configurable(self): + """Test that the flush interval constant is set correctly.""" + # Verify the default interval is 5 seconds + assert RECORDING_FLUSH_INTERVAL_SECONDS == 5 + + +class TestConcurrentFlushSafety: + """Tests for concurrent flush safety (lock protection).""" + + @pytest.mark.asyncio + async def test_concurrent_flushes_do_not_corrupt_event_buffer( + self, mock_browser_session, mock_cdp_session + ): + """Test that concurrent flushes don't corrupt the event buffer.""" + with tempfile.TemporaryDirectory() as temp_dir: + session = RecordingSession() + session._storage._session_dir = temp_dir + session._is_recording = True + + async def mock_evaluate(*args, **kwargs): + expression = kwargs.get("params", {}).get("expression", "") + if ( + "window.__rrweb_events" in expression + and "JSON.stringify" in expression + ): + events = create_mock_events(20, size_per_event=100) + return {"result": {"value": json.dumps({"events": events})}} + return {"result": {"value": None}} + + mock_cdp_session.cdp_client.send.Runtime.evaluate = AsyncMock( + side_effect=mock_evaluate + ) + + # Trigger multiple concurrent flushes + tasks = [ + asyncio.create_task(session.flush_events(mock_browser_session)) + for _ in range(5) + ] + await asyncio.gather(*tasks) + + # Verify: Events should be accumulated in buffer (5 flushes * 20 events) + assert len(session.events) == 100 + + @pytest.mark.asyncio + async def test_periodic_flush_creates_timestamped_files( + self, mock_browser_session, mock_cdp_session + ): + """Test that periodic flush creates timestamped files that are sortable.""" + from openhands.tools.browser_use.recording import RecordingConfig + + with tempfile.TemporaryDirectory() as temp_dir: + config = RecordingConfig(flush_interval_seconds=0.05) + session = RecordingSession(config=config) + session._storage._session_dir = temp_dir + session._is_recording = True + + async def mock_evaluate(*args, **kwargs): + expression = kwargs.get("params", {}).get("expression", "") + if ( + "window.__rrweb_events" in expression + and "JSON.stringify" in expression + ): + events = create_mock_events(20, size_per_event=100) + return {"result": {"value": json.dumps({"events": events})}} + return {"result": {"value": None}} + + mock_cdp_session.cdp_client.send.Runtime.evaluate = AsyncMock( + side_effect=mock_evaluate + ) + + flush_task = asyncio.create_task( + session._periodic_flush_loop(mock_browser_session) + ) + await asyncio.sleep(0.2) + + session._is_recording = False + await asyncio.sleep(0.1) + if not flush_task.done(): + flush_task.cancel() + try: + await flush_task + except asyncio.CancelledError: + pass + + files = sorted(os.listdir(temp_dir)) + json_files = [f for f in files if f.endswith(".json")] + + # Files should be unique and sortable by timestamp + assert len(json_files) >= 2, f"Expected at least 2 files, got {json_files}" + assert len(json_files) == len(set(json_files)), "Files should be unique" + + # Verify file integrity + for json_file in json_files: + filepath = os.path.join(temp_dir, json_file) + with open(filepath) as f: + events = json.load(f) + assert isinstance(events, list) + + +class TestRecordingIsolation: + """Tests for recording session isolation (separate subfolders).""" + + @pytest.mark.asyncio + async def test_multiple_recordings_create_separate_subfolders( + self, mock_browser_session, mock_cdp_session + ): + """Test that multiple start/stop cycles create separate subfolders.""" + import time + + with tempfile.TemporaryDirectory() as temp_dir: + # Set up mock CDP session for successful recording + # Note: stop_recording expects a JSON string, not a dict + mock_cdp_session.cdp_client.send.Runtime.evaluate = AsyncMock( + side_effect=[ + # First recording: wait for rrweb load + {"result": {"value": {"success": True}}}, + # First recording: start recording + {"result": {"value": {"status": "started"}}}, + # First recording: set recording flag (in stop) + {"result": {"value": None}}, + # First recording: stop recording (returns JSON string) + {"result": {"value": json.dumps({"events": [{"type": 3}] * 5})}}, + # First recording: set recording flag to false + {"result": {"value": None}}, + # Second recording: wait for rrweb load + {"result": {"value": {"success": True}}}, + # Second recording: start recording + {"result": {"value": {"status": "started"}}}, + # Second recording: set recording flag (in stop) + {"result": {"value": None}}, + # Second recording: stop recording (returns JSON string) + {"result": {"value": json.dumps({"events": [{"type": 3}] * 10})}}, + # Second recording: set recording flag to false + {"result": {"value": None}}, + ] + ) + mock_cdp_session.cdp_client.send.Page.addScriptToEvaluateOnNewDocument = ( + AsyncMock(return_value={"identifier": "script-1"}) + ) + + # First recording session + session1 = RecordingSession(output_dir=temp_dir) + await session1.start(mock_browser_session) + session_dir_1 = session1.session_dir + await session1.stop(mock_browser_session) + + # Small delay to ensure different timestamps + time.sleep(0.01) + + # Second recording session + session2 = RecordingSession(output_dir=temp_dir) + await session2.start(mock_browser_session) + session_dir_2 = session2.session_dir + await session2.stop(mock_browser_session) + + # Verify: Two separate subfolders were created + subdirs = [ + d + for d in os.listdir(temp_dir) + if os.path.isdir(os.path.join(temp_dir, d)) + ] + assert len(subdirs) == 2, ( + f"Expected 2 recording subfolders, got {len(subdirs)}: {subdirs}" + ) + + # Verify both start with "recording-" + for subdir in subdirs: + assert subdir.startswith("recording-"), ( + f"Expected subfolder to start with 'recording-', got {subdir}" + ) + + # Verify the session_dirs are different + assert session_dir_1 != session_dir_2, ( + "Expected different session directories for each recording" + ) + + # Verify each subfolder has its own files + for subdir in subdirs: + subdir_path = os.path.join(temp_dir, subdir) + files = os.listdir(subdir_path) + json_files = [f for f in files if f.endswith(".json")] + assert len(json_files) > 0, ( + f"Expected at least one JSON file in {subdir}" + ) + + +class TestFileCountAccuracy: + """Tests for accurate file count reporting.""" + + @pytest.mark.asyncio + async def test_file_count_accurate_with_existing_files(self): + """Test that file count is accurate when session_dir has existing files.""" + with tempfile.TemporaryDirectory() as temp_dir: + # Pre-create some files to simulate existing recordings + for i in range(1, 4): # Create 1.json, 2.json, 3.json + with open(os.path.join(temp_dir, f"{i}.json"), "w") as f: + json.dump([{"type": "existing"}], f) + + session = RecordingSession() + session._storage._session_dir = temp_dir + session._is_recording = True + + # Add events to buffer and save twice + for _ in range(2): + session._events.extend(create_mock_events(20)) + session._save_and_clear_events() + + # Verify: file_count should be 2 (files written this session) + assert session.file_count == 2, ( + f"Expected file_count=2 (files written), got {session.file_count}" + ) + + # Verify new files were created (timestamps, not numbered) + files = sorted(os.listdir(temp_dir)) + json_files = [f for f in files if f.endswith(".json")] + assert len(json_files) == 5 # 3 existing + 2 new + + @pytest.mark.asyncio + async def test_file_count_zero_when_no_events(self): + """Test that file count is 0 when no events are recorded.""" + with tempfile.TemporaryDirectory() as temp_dir: + session = RecordingSession() + session._storage._session_dir = temp_dir + session._is_recording = True + + # No flush calls, no events + assert session.file_count == 0 + + @pytest.mark.asyncio + async def test_file_count_matches_actual_files_written(self): + """Test that file_count exactly matches number of files written.""" + with tempfile.TemporaryDirectory() as temp_dir: + session = RecordingSession() + session._storage._session_dir = temp_dir + session._is_recording = True + + # Add events to buffer and save 5 times + for _ in range(5): + session._events.extend(create_mock_events(20)) + session._save_and_clear_events() + + # Verify file_count matches actual files + files = os.listdir(temp_dir) + json_files = [f for f in files if f.endswith(".json")] + assert session.file_count == len(json_files) == 5