diff --git a/.semversioner/next-release/patch-20260211221626814603.json b/.semversioner/next-release/patch-20260211221626814603.json new file mode 100644 index 000000000..9f2e9d2db --- /dev/null +++ b/.semversioner/next-release/patch-20260211221626814603.json @@ -0,0 +1,4 @@ +{ + "type": "patch", + "description": "add profiling to get memory usage" +} diff --git a/packages/graphrag/graphrag/index/run/profiling.py b/packages/graphrag/graphrag/index/run/profiling.py new file mode 100644 index 000000000..bac7aa5be --- /dev/null +++ b/packages/graphrag/graphrag/index/run/profiling.py @@ -0,0 +1,60 @@ +# Copyright (C) 2025 Microsoft +# Licensed under the MIT License + +"""Workflow profiling utilities.""" + +import time +import tracemalloc +from types import TracebackType +from typing import Self + +from graphrag.index.typing.stats import WorkflowMetrics + + +class WorkflowProfiler: + """Context manager for profiling workflow execution. + + Captures timing and memory metrics using tracemalloc. Designed to wrap + workflow execution in run_pipeline with minimal code intrusion. + + Example + ------- + with WorkflowProfiler() as profiler: + result = await workflow_function(config, context) + metrics = profiler.metrics + """ + + def __init__(self) -> None: + self._start_time: float = 0.0 + self._elapsed: float = 0.0 + self._peak_memory: int = 0 + self._current_memory: int = 0 + self._tracemalloc_overhead: int = 0 + + def __enter__(self) -> Self: + """Start profiling: begin tracemalloc and record start time.""" + tracemalloc.start() + self._start_time = time.time() + return self + + def __exit__( + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> None: + """Stop profiling: capture metrics and stop tracemalloc.""" + self._elapsed = time.time() - self._start_time + self._current_memory, self._peak_memory = tracemalloc.get_traced_memory() + self._tracemalloc_overhead = tracemalloc.get_tracemalloc_memory() + tracemalloc.stop() + + @property + def metrics(self) -> WorkflowMetrics: + """Return collected metrics as a WorkflowMetrics dataclass.""" + return WorkflowMetrics( + overall=self._elapsed, + peak_memory_bytes=self._peak_memory, + memory_delta_bytes=self._current_memory, + tracemalloc_overhead_bytes=self._tracemalloc_overhead, + ) diff --git a/packages/graphrag/graphrag/index/run/run_pipeline.py b/packages/graphrag/graphrag/index/run/run_pipeline.py index 401f067a3..7f4c45f37 100644 --- a/packages/graphrag/graphrag/index/run/run_pipeline.py +++ b/packages/graphrag/graphrag/index/run/run_pipeline.py @@ -18,6 +18,7 @@ from graphrag.callbacks.workflow_callbacks import WorkflowCallbacks from graphrag.config.models.graph_rag_config import GraphRagConfig +from graphrag.index.run.profiling import WorkflowProfiler from graphrag.index.run.utils import create_run_context from graphrag.index.typing.context import PipelineRunContext from graphrag.index.typing.pipeline import Pipeline @@ -127,13 +128,15 @@ async def _run_pipeline( for name, workflow_function in pipeline.run(): last_workflow = name context.callbacks.workflow_start(name, None) - work_time = time.time() - result = await workflow_function(config, context) + + with WorkflowProfiler() as profiler: + result = await workflow_function(config, context) + context.callbacks.workflow_end(name, result) yield PipelineRunResult( workflow=name, result=result.result, state=context.state, error=None ) - context.stats.workflows[name] = {"overall": time.time() - work_time} + context.stats.workflows[name] = profiler.metrics if result.stop: logger.info("Halting pipeline at workflow request") break diff --git a/packages/graphrag/graphrag/index/typing/stats.py b/packages/graphrag/graphrag/index/typing/stats.py index 271773600..cade3198a 100644 --- a/packages/graphrag/graphrag/index/typing/stats.py +++ b/packages/graphrag/graphrag/index/typing/stats.py @@ -6,6 +6,23 @@ from dataclasses import dataclass, field +@dataclass +class WorkflowMetrics: + """Metrics collected for a single workflow execution.""" + + overall: float + """Wall-clock time in seconds.""" + + peak_memory_bytes: int + """Peak memory usage during workflow execution (tracemalloc).""" + + memory_delta_bytes: int + """Net memory change after workflow completion (tracemalloc).""" + + tracemalloc_overhead_bytes: int + """Memory used by tracemalloc itself for tracking allocations.""" + + @dataclass class PipelineRunStats: """Pipeline running stats.""" @@ -21,5 +38,5 @@ class PipelineRunStats: input_load_time: float = field(default=0) """Float representing the input load time.""" - workflows: dict[str, dict[str, float]] = field(default_factory=dict) - """A dictionary of workflows.""" + workflows: dict[str, WorkflowMetrics] = field(default_factory=dict) + """Metrics for each workflow execution.""" diff --git a/tests/unit/indexing/test_profiling.py b/tests/unit/indexing/test_profiling.py new file mode 100644 index 000000000..ae57f3fa4 --- /dev/null +++ b/tests/unit/indexing/test_profiling.py @@ -0,0 +1,95 @@ +# Copyright (C) 2025 Microsoft +# Licensed under the MIT License + +"""Unit tests for WorkflowProfiler.""" + +import time + +from graphrag.index.run.profiling import WorkflowProfiler +from graphrag.index.typing.stats import WorkflowMetrics + + +class TestWorkflowProfiler: + """Tests for the WorkflowProfiler context manager.""" + + def test_captures_time(self): + """Verify profiler captures elapsed time.""" + with WorkflowProfiler() as profiler: + time.sleep(0.05) # Sleep 50ms + + metrics = profiler.metrics + assert metrics.overall >= 0.05 + assert metrics.overall < 0.5 # Should not take too long + + def test_captures_peak_memory(self): + """Verify profiler captures peak memory from allocations.""" + with WorkflowProfiler() as profiler: + # Allocate ~1MB of data + data = [0] * (1024 * 1024 // 8) # 1M integers ≈ 8MB on 64-bit + _ = data # Keep reference to prevent GC + + metrics = profiler.metrics + assert metrics.peak_memory_bytes > 0 + + def test_captures_memory_delta(self): + """Verify profiler captures memory delta (current allocation).""" + with WorkflowProfiler() as profiler: + _data = [0] * 10000 # Keep allocation in scope + + metrics = profiler.metrics + # Memory delta should be non-negative + assert metrics.memory_delta_bytes >= 0 + + def test_captures_tracemalloc_overhead(self): + """Verify profiler captures tracemalloc's own memory overhead.""" + with WorkflowProfiler() as profiler: + _ = list(range(1000)) + + metrics = profiler.metrics + assert metrics.tracemalloc_overhead_bytes > 0 + + def test_returns_workflow_metrics_dataclass(self): + """Verify profiler.metrics returns a WorkflowMetrics instance.""" + with WorkflowProfiler() as profiler: + pass + + metrics = profiler.metrics + assert isinstance(metrics, WorkflowMetrics) + + def test_all_metrics_populated(self): + """Verify all four metrics are populated after profiling.""" + with WorkflowProfiler() as profiler: + _ = list(range(100)) + + metrics = profiler.metrics + assert metrics.overall >= 0 + assert metrics.peak_memory_bytes >= 0 + assert metrics.memory_delta_bytes >= 0 + assert metrics.tracemalloc_overhead_bytes >= 0 + + def test_handles_exception_in_context(self): + """Verify profiler captures metrics even when exception is raised.""" + profiler: WorkflowProfiler | None = None + try: + with WorkflowProfiler() as profiler: + _ = [0] * 1000 + msg = "Test exception" + raise ValueError(msg) + except ValueError: + pass + + assert profiler is not None + metrics = profiler.metrics + assert metrics.overall > 0 + assert metrics.peak_memory_bytes > 0 + + def test_multiple_profilers_independent(self): + """Verify multiple profiler instances don't interfere.""" + with WorkflowProfiler() as profiler1: + time.sleep(0.02) + + with WorkflowProfiler() as profiler2: + time.sleep(0.04) + + # profiler2 should have longer time + assert profiler2.metrics.overall > profiler1.metrics.overall