Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .semversioner/next-release/patch-20260211221626814603.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"type": "patch",
"description": "add profiling to get memory usage"
}
60 changes: 60 additions & 0 deletions packages/graphrag/graphrag/index/run/profiling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Copyright (C) 2025 Microsoft
# Licensed under the MIT License

"""Workflow profiling utilities."""

import time
import tracemalloc
from types import TracebackType
from typing import Self

from graphrag.index.typing.stats import WorkflowMetrics


class WorkflowProfiler:
"""Context manager for profiling workflow execution.

Captures timing and memory metrics using tracemalloc. Designed to wrap
workflow execution in run_pipeline with minimal code intrusion.

Example
-------
with WorkflowProfiler() as profiler:
result = await workflow_function(config, context)
metrics = profiler.metrics
"""

def __init__(self) -> None:
self._start_time: float = 0.0
self._elapsed: float = 0.0
self._peak_memory: int = 0
self._current_memory: int = 0
self._tracemalloc_overhead: int = 0

def __enter__(self) -> Self:
"""Start profiling: begin tracemalloc and record start time."""
tracemalloc.start()
self._start_time = time.time()
return self

def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
"""Stop profiling: capture metrics and stop tracemalloc."""
self._elapsed = time.time() - self._start_time
self._current_memory, self._peak_memory = tracemalloc.get_traced_memory()
self._tracemalloc_overhead = tracemalloc.get_tracemalloc_memory()
tracemalloc.stop()

@property
def metrics(self) -> WorkflowMetrics:
"""Return collected metrics as a WorkflowMetrics dataclass."""
return WorkflowMetrics(
overall=self._elapsed,
peak_memory_bytes=self._peak_memory,
memory_delta_bytes=self._current_memory,
tracemalloc_overhead_bytes=self._tracemalloc_overhead,
)
9 changes: 6 additions & 3 deletions packages/graphrag/graphrag/index/run/run_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from graphrag.callbacks.workflow_callbacks import WorkflowCallbacks
from graphrag.config.models.graph_rag_config import GraphRagConfig
from graphrag.index.run.profiling import WorkflowProfiler
from graphrag.index.run.utils import create_run_context
from graphrag.index.typing.context import PipelineRunContext
from graphrag.index.typing.pipeline import Pipeline
Expand Down Expand Up @@ -127,13 +128,15 @@ async def _run_pipeline(
for name, workflow_function in pipeline.run():
last_workflow = name
context.callbacks.workflow_start(name, None)
work_time = time.time()
result = await workflow_function(config, context)

with WorkflowProfiler() as profiler:
result = await workflow_function(config, context)

context.callbacks.workflow_end(name, result)
yield PipelineRunResult(
workflow=name, result=result.result, state=context.state, error=None
)
context.stats.workflows[name] = {"overall": time.time() - work_time}
context.stats.workflows[name] = profiler.metrics
if result.stop:
logger.info("Halting pipeline at workflow request")
break
Expand Down
21 changes: 19 additions & 2 deletions packages/graphrag/graphrag/index/typing/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,23 @@
from dataclasses import dataclass, field


@dataclass
class WorkflowMetrics:
"""Metrics collected for a single workflow execution."""

overall: float
"""Wall-clock time in seconds."""

peak_memory_bytes: int
"""Peak memory usage during workflow execution (tracemalloc)."""

memory_delta_bytes: int
"""Net memory change after workflow completion (tracemalloc)."""

tracemalloc_overhead_bytes: int
"""Memory used by tracemalloc itself for tracking allocations."""


@dataclass
class PipelineRunStats:
"""Pipeline running stats."""
Expand All @@ -21,5 +38,5 @@ class PipelineRunStats:
input_load_time: float = field(default=0)
"""Float representing the input load time."""

workflows: dict[str, dict[str, float]] = field(default_factory=dict)
"""A dictionary of workflows."""
workflows: dict[str, WorkflowMetrics] = field(default_factory=dict)
"""Metrics for each workflow execution."""
95 changes: 95 additions & 0 deletions tests/unit/indexing/test_profiling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# Copyright (C) 2025 Microsoft
# Licensed under the MIT License

"""Unit tests for WorkflowProfiler."""

import time

from graphrag.index.run.profiling import WorkflowProfiler
from graphrag.index.typing.stats import WorkflowMetrics


class TestWorkflowProfiler:
"""Tests for the WorkflowProfiler context manager."""

def test_captures_time(self):
"""Verify profiler captures elapsed time."""
with WorkflowProfiler() as profiler:
time.sleep(0.05) # Sleep 50ms

metrics = profiler.metrics
assert metrics.overall >= 0.05
assert metrics.overall < 0.5 # Should not take too long

def test_captures_peak_memory(self):
"""Verify profiler captures peak memory from allocations."""
with WorkflowProfiler() as profiler:
# Allocate ~1MB of data
data = [0] * (1024 * 1024 // 8) # 1M integers ≈ 8MB on 64-bit
_ = data # Keep reference to prevent GC

metrics = profiler.metrics
assert metrics.peak_memory_bytes > 0

def test_captures_memory_delta(self):
"""Verify profiler captures memory delta (current allocation)."""
with WorkflowProfiler() as profiler:
_data = [0] * 10000 # Keep allocation in scope

metrics = profiler.metrics
# Memory delta should be non-negative
assert metrics.memory_delta_bytes >= 0

def test_captures_tracemalloc_overhead(self):
"""Verify profiler captures tracemalloc's own memory overhead."""
with WorkflowProfiler() as profiler:
_ = list(range(1000))

metrics = profiler.metrics
assert metrics.tracemalloc_overhead_bytes > 0

def test_returns_workflow_metrics_dataclass(self):
"""Verify profiler.metrics returns a WorkflowMetrics instance."""
with WorkflowProfiler() as profiler:
pass

metrics = profiler.metrics
assert isinstance(metrics, WorkflowMetrics)

def test_all_metrics_populated(self):
"""Verify all four metrics are populated after profiling."""
with WorkflowProfiler() as profiler:
_ = list(range(100))

metrics = profiler.metrics
assert metrics.overall >= 0
assert metrics.peak_memory_bytes >= 0
assert metrics.memory_delta_bytes >= 0
assert metrics.tracemalloc_overhead_bytes >= 0

def test_handles_exception_in_context(self):
"""Verify profiler captures metrics even when exception is raised."""
profiler: WorkflowProfiler | None = None
try:
with WorkflowProfiler() as profiler:
_ = [0] * 1000
msg = "Test exception"
raise ValueError(msg)
except ValueError:
pass

assert profiler is not None
metrics = profiler.metrics
assert metrics.overall > 0
assert metrics.peak_memory_bytes > 0

def test_multiple_profilers_independent(self):
"""Verify multiple profiler instances don't interfere."""
with WorkflowProfiler() as profiler1:
time.sleep(0.02)

with WorkflowProfiler() as profiler2:
time.sleep(0.04)

# profiler2 should have longer time
assert profiler2.metrics.overall > profiler1.metrics.overall