diff --git a/src/crawlee/_autoscaling/_types.py b/src/crawlee/_autoscaling/_types.py index 030e8985fe..b231c9062d 100644 --- a/src/crawlee/_autoscaling/_types.py +++ b/src/crawlee/_autoscaling/_types.py @@ -8,6 +8,9 @@ from crawlee._utils.byte_size import ByteSize +SYSTEM_WIDE_MEMORY_OVERLOAD_THRESHOLD = 0.97 + + @dataclass class LoadRatioInfo: """Represent the load ratio of a resource.""" @@ -91,9 +94,15 @@ class MemorySnapshot: current_size: ByteSize """Memory usage of the current Python process and its children.""" + system_wide_used_size: ByteSize | None + """Memory usage of all processes, system-wide.""" + max_memory_size: ByteSize """The maximum memory that can be used by `AutoscaledPool`.""" + system_wide_memory_size: ByteSize | None + """Total memory available in the whole system.""" + max_used_memory_ratio: float """The maximum acceptable ratio of `current_size` to `max_memory_size`.""" @@ -103,6 +112,11 @@ class MemorySnapshot: @property def is_overloaded(self) -> bool: """Indicate whether the memory is considered as overloaded.""" + if self.system_wide_memory_size is not None and self.system_wide_used_size is not None: + system_wide_utilization = self.system_wide_used_size / self.system_wide_memory_size + if system_wide_utilization > SYSTEM_WIDE_MEMORY_OVERLOAD_THRESHOLD: + return True + return (self.current_size / self.max_memory_size) > self.max_used_memory_ratio diff --git a/src/crawlee/_autoscaling/snapshotter.py b/src/crawlee/_autoscaling/snapshotter.py index 9502038146..82babb3fab 100644 --- a/src/crawlee/_autoscaling/snapshotter.py +++ b/src/crawlee/_autoscaling/snapshotter.py @@ -15,7 +15,7 @@ from crawlee._utils.context import ensure_context from crawlee._utils.docs import docs_group from crawlee._utils.recurring_task import RecurringTask -from crawlee._utils.system import get_memory_info +from crawlee._utils.system import MemoryInfo, get_memory_info from crawlee.events._types import Event, EventSystemInfoData if TYPE_CHECKING: @@ -273,8 +273,14 @@ def _snapshot_memory(self, event_data: EventSystemInfoData) -> None: max_memory_size=self._max_memory_size, max_used_memory_ratio=self._max_used_memory_ratio, created_at=event_data.memory_info.created_at, + system_wide_used_size=None, + system_wide_memory_size=None, ) + if isinstance(memory_info := event_data.memory_info, MemoryInfo): + snapshot.system_wide_used_size = memory_info.system_wide_used_size + snapshot.system_wide_memory_size = memory_info.total_size + snapshots = cast('list[Snapshot]', self._memory_snapshots) self._prune_snapshots(snapshots, snapshot.created_at) self._memory_snapshots.add(snapshot) diff --git a/src/crawlee/_utils/system.py b/src/crawlee/_utils/system.py index c3c3261e88..25fe021f20 100644 --- a/src/crawlee/_utils/system.py +++ b/src/crawlee/_utils/system.py @@ -59,6 +59,14 @@ class MemoryInfo(MemoryUsageInfo): ] """Total memory available in the system.""" + system_wide_used_size: Annotated[ + ByteSize, + PlainValidator(ByteSize.validate), + PlainSerializer(lambda size: size.bytes), + Field(alias='systemWideUsedSize'), + ] + """Total memory used by all processes system-wide (including non-crawlee processes).""" + def get_cpu_info() -> CpuInfo: """Retrieve the current CPU usage. @@ -89,11 +97,12 @@ def get_memory_info() -> MemoryInfo: with suppress(psutil.NoSuchProcess): current_size_bytes += _get_used_memory(child.memory_full_info()) - total_size_bytes = psutil.virtual_memory().total + vm = psutil.virtual_memory() return MemoryInfo( - total_size=ByteSize(total_size_bytes), + total_size=ByteSize(vm.total), current_size=ByteSize(current_size_bytes), + system_wide_used_size=ByteSize(vm.total - vm.available), ) diff --git a/tests/unit/_autoscaling/test_snapshotter.py b/tests/unit/_autoscaling/test_snapshotter.py index 667cc5a8f1..5cb0b63d67 100644 --- a/tests/unit/_autoscaling/test_snapshotter.py +++ b/tests/unit/_autoscaling/test_snapshotter.py @@ -29,6 +29,7 @@ def event_system_data_info() -> EventSystemInfoData: memory_info=MemoryInfo( total_size=ByteSize.from_gb(8), current_size=ByteSize.from_gb(4), + system_wide_used_size=ByteSize.from_gb(5), ), ) @@ -52,6 +53,28 @@ def test_snapshot_memory(snapshotter: Snapshotter, event_system_data_info: Event assert snapshotter._memory_snapshots[0].current_size == event_system_data_info.memory_info.current_size +def test_snapshot_memory_with_memory_info_sets_system_wide_fields(snapshotter: Snapshotter) -> None: + memory_info = MemoryInfo( + total_size=ByteSize.from_gb(16), + current_size=ByteSize.from_gb(4), + system_wide_used_size=ByteSize.from_gb(12), + ) + + event_data = EventSystemInfoData( + cpu_info=CpuInfo(used_ratio=0.5), + memory_info=memory_info, + ) + + snapshotter._snapshot_memory(event_data) + + assert len(snapshotter._memory_snapshots) == 1 + memory_snapshot = snapshotter._memory_snapshots[0] + + # Test that system-wide fields are properly set + assert memory_snapshot.system_wide_used_size == memory_info.system_wide_used_size + assert memory_snapshot.system_wide_memory_size == memory_info.total_size + + def test_snapshot_event_loop(snapshotter: Snapshotter) -> None: snapshotter._event_loop_snapshots = Snapshotter._get_sorted_list_by_created_at( [ @@ -254,7 +277,10 @@ def create_event_data(creation_time: datetime) -> EventSystemInfoData: return EventSystemInfoData( cpu_info=CpuInfo(used_ratio=0.5, created_at=creation_time), memory_info=MemoryInfo( - current_size=ByteSize(bytes=1), created_at=creation_time, total_size=ByteSize(bytes=2) + current_size=ByteSize(bytes=1), + created_at=creation_time, + total_size=ByteSize(bytes=2), + system_wide_used_size=ByteSize.from_gb(5), ), ) diff --git a/tests/unit/_autoscaling/test_system_status.py b/tests/unit/_autoscaling/test_system_status.py index 86eb01471f..ce54be4630 100644 --- a/tests/unit/_autoscaling/test_system_status.py +++ b/tests/unit/_autoscaling/test_system_status.py @@ -102,24 +102,32 @@ def test_get_system_info(snapshotter: Snapshotter, now: datetime) -> None: max_memory_size=ByteSize.from_gb(12), max_used_memory_ratio=0.8, created_at=now - timedelta(seconds=90), + system_wide_used_size=None, + system_wide_memory_size=None, ), MemorySnapshot( current_size=ByteSize.from_gb(7), max_memory_size=ByteSize.from_gb(8), max_used_memory_ratio=0.8, created_at=now - timedelta(seconds=60), + system_wide_used_size=None, + system_wide_memory_size=None, ), MemorySnapshot( current_size=ByteSize.from_gb(28), max_memory_size=ByteSize.from_gb(30), max_used_memory_ratio=0.8, created_at=now - timedelta(seconds=30), + system_wide_used_size=None, + system_wide_memory_size=None, ), MemorySnapshot( current_size=ByteSize.from_gb(48), max_memory_size=ByteSize.from_gb(60), max_used_memory_ratio=0.8, created_at=now, + system_wide_used_size=None, + system_wide_memory_size=None, ), ] ) @@ -204,3 +212,42 @@ def test_client_overloaded( # Ratio of overloaded snapshots is 2/3 (2 minutes out of 3) assert system_status._is_client_overloaded().is_overloaded == is_overloaded + + +def test_memory_overloaded_system_wide(snapshotter: Snapshotter, now: datetime) -> None: + """Test that system-wide memory overload is detected when system-wide memory utilization exceeds threshold.""" + system_status = SystemStatus( + snapshotter, + max_snapshot_age=timedelta(minutes=1), + memory_overload_threshold=0.5, # Set high threshold so process memory won't trigger overload + ) + + # Add memory snapshots with system-wide memory usage above threshold (97%) + system_status._snapshotter._memory_snapshots = Snapshotter._get_sorted_list_by_created_at( + [ + MemorySnapshot( + current_size=ByteSize.from_gb(1), # Process memory is low + max_memory_size=ByteSize.from_gb(8), # Max memory is high + max_used_memory_ratio=0.8, # Ratio is fine + created_at=now - timedelta(minutes=1), + system_wide_used_size=ByteSize.from_gb(31), # System-wide used is high + system_wide_memory_size=ByteSize.from_gb(32), # System-wide total (31/32 = 96.875% < 97%) + ), + MemorySnapshot( + current_size=ByteSize.from_gb(1), # Process memory is low + max_memory_size=ByteSize.from_gb(8), # Max memory is high + max_used_memory_ratio=0.8, # Ratio is fine + created_at=now, + system_wide_used_size=ByteSize.from_gb(31.5), # System-wide used is high + system_wide_memory_size=ByteSize.from_gb(32), # System-wide total (31.5/32 = 98.4% > 97%) + ), + ] + ) + + memory_info = system_status._is_memory_overloaded() + + # Should be overloaded due to system-wide memory usage exceeding 97% threshold + assert memory_info.is_overloaded is True + # The actual ratio should be 1.0 (the entire time period from first to second snapshot is overloaded) + assert memory_info.actual_ratio == 1.0 + assert memory_info.limit_ratio == 0.5 diff --git a/uv.lock b/uv.lock index ea78cf175e..471b3f9676 100644 --- a/uv.lock +++ b/uv.lock @@ -1209,7 +1209,7 @@ name = "importlib-metadata" version = "8.7.0" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "zipp" }, + { name = "zipp", marker = "python_full_version < '3.11'" }, ] sdist = { url = "https://files.pythonhosted.org/packages/76/66/650a33bd90f786193e4de4b3ad86ea60b53c89b669a5c7be931fac31cdb0/importlib_metadata-8.7.0.tar.gz", hash = "sha256:d13b81ad223b890aa16c5471f2ac3056cf76c5f10f82d6f9292f0b415f389000", size = 56641, upload-time = "2025-04-27T15:29:01.736Z" } wheels = [