Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/crawlee/_autoscaling/_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
from crawlee._utils.byte_size import ByteSize


SYSTEM_WIDE_MEMORY_OVERLOAD_THRESHOLD = 0.97


@dataclass
class LoadRatioInfo:
"""Represent the load ratio of a resource."""
Expand Down Expand Up @@ -91,9 +94,15 @@ class MemorySnapshot:
current_size: ByteSize
"""Memory usage of the current Python process and its children."""

system_wide_used_size: ByteSize | None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe these two new values can have default None, I saw it is anyway initialized with None both in code and test

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I already updated all the call sites... and I'd need to reorder the fields in the dataclass ☹️ I'd prefer to leave it this way.

"""Memory usage of all processes, system-wide."""

max_memory_size: ByteSize
"""The maximum memory that can be used by `AutoscaledPool`."""

system_wide_memory_size: ByteSize | None
"""Total memory available in the whole system."""

max_used_memory_ratio: float
"""The maximum acceptable ratio of `current_size` to `max_memory_size`."""

Expand All @@ -103,6 +112,11 @@ class MemorySnapshot:
@property
def is_overloaded(self) -> bool:
"""Indicate whether the memory is considered as overloaded."""
if self.system_wide_memory_size is not None and self.system_wide_used_size is not None:
system_wide_utilization = self.system_wide_used_size / self.system_wide_memory_size
if system_wide_utilization > SYSTEM_WIDE_MEMORY_OVERLOAD_THRESHOLD:
return True

return (self.current_size / self.max_memory_size) > self.max_used_memory_ratio


Expand Down
8 changes: 7 additions & 1 deletion src/crawlee/_autoscaling/snapshotter.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from crawlee._utils.context import ensure_context
from crawlee._utils.docs import docs_group
from crawlee._utils.recurring_task import RecurringTask
from crawlee._utils.system import get_memory_info
from crawlee._utils.system import MemoryInfo, get_memory_info
from crawlee.events._types import Event, EventSystemInfoData

if TYPE_CHECKING:
Expand Down Expand Up @@ -273,8 +273,14 @@ def _snapshot_memory(self, event_data: EventSystemInfoData) -> None:
max_memory_size=self._max_memory_size,
max_used_memory_ratio=self._max_used_memory_ratio,
created_at=event_data.memory_info.created_at,
system_wide_used_size=None,
system_wide_memory_size=None,
)

if isinstance(memory_info := event_data.memory_info, MemoryInfo):
snapshot.system_wide_used_size = memory_info.system_wide_used_size
snapshot.system_wide_memory_size = memory_info.total_size

snapshots = cast('list[Snapshot]', self._memory_snapshots)
self._prune_snapshots(snapshots, snapshot.created_at)
self._memory_snapshots.add(snapshot)
Expand Down
13 changes: 11 additions & 2 deletions src/crawlee/_utils/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ class MemoryInfo(MemoryUsageInfo):
]
"""Total memory available in the system."""

system_wide_used_size: Annotated[
ByteSize,
PlainValidator(ByteSize.validate),
PlainSerializer(lambda size: size.bytes),
Field(alias='systemWideUsedSize'),
]
"""Total memory used by all processes system-wide (including non-crawlee processes)."""


def get_cpu_info() -> CpuInfo:
"""Retrieve the current CPU usage.
Expand Down Expand Up @@ -89,11 +97,12 @@ def get_memory_info() -> MemoryInfo:
with suppress(psutil.NoSuchProcess):
current_size_bytes += _get_used_memory(child.memory_full_info())

total_size_bytes = psutil.virtual_memory().total
vm = psutil.virtual_memory()

return MemoryInfo(
total_size=ByteSize(total_size_bytes),
total_size=ByteSize(vm.total),
current_size=ByteSize(current_size_bytes),
system_wide_used_size=ByteSize(vm.total - vm.available),
)


Expand Down
28 changes: 27 additions & 1 deletion tests/unit/_autoscaling/test_snapshotter.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def event_system_data_info() -> EventSystemInfoData:
memory_info=MemoryInfo(
total_size=ByteSize.from_gb(8),
current_size=ByteSize.from_gb(4),
system_wide_used_size=ByteSize.from_gb(5),
),
)

Expand All @@ -52,6 +53,28 @@ def test_snapshot_memory(snapshotter: Snapshotter, event_system_data_info: Event
assert snapshotter._memory_snapshots[0].current_size == event_system_data_info.memory_info.current_size


def test_snapshot_memory_with_memory_info_sets_system_wide_fields(snapshotter: Snapshotter) -> None:
memory_info = MemoryInfo(
total_size=ByteSize.from_gb(16),
current_size=ByteSize.from_gb(4),
system_wide_used_size=ByteSize.from_gb(12),
)

event_data = EventSystemInfoData(
cpu_info=CpuInfo(used_ratio=0.5),
memory_info=memory_info,
)

snapshotter._snapshot_memory(event_data)

assert len(snapshotter._memory_snapshots) == 1
memory_snapshot = snapshotter._memory_snapshots[0]

# Test that system-wide fields are properly set
assert memory_snapshot.system_wide_used_size == memory_info.system_wide_used_size
assert memory_snapshot.system_wide_memory_size == memory_info.total_size


def test_snapshot_event_loop(snapshotter: Snapshotter) -> None:
snapshotter._event_loop_snapshots = Snapshotter._get_sorted_list_by_created_at(
[
Expand Down Expand Up @@ -254,7 +277,10 @@ def create_event_data(creation_time: datetime) -> EventSystemInfoData:
return EventSystemInfoData(
cpu_info=CpuInfo(used_ratio=0.5, created_at=creation_time),
memory_info=MemoryInfo(
current_size=ByteSize(bytes=1), created_at=creation_time, total_size=ByteSize(bytes=2)
current_size=ByteSize(bytes=1),
created_at=creation_time,
total_size=ByteSize(bytes=2),
system_wide_used_size=ByteSize.from_gb(5),
),
)

Expand Down
47 changes: 47 additions & 0 deletions tests/unit/_autoscaling/test_system_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,24 +102,32 @@ def test_get_system_info(snapshotter: Snapshotter, now: datetime) -> None:
max_memory_size=ByteSize.from_gb(12),
max_used_memory_ratio=0.8,
created_at=now - timedelta(seconds=90),
system_wide_used_size=None,
system_wide_memory_size=None,
),
MemorySnapshot(
current_size=ByteSize.from_gb(7),
max_memory_size=ByteSize.from_gb(8),
max_used_memory_ratio=0.8,
created_at=now - timedelta(seconds=60),
system_wide_used_size=None,
system_wide_memory_size=None,
),
MemorySnapshot(
current_size=ByteSize.from_gb(28),
max_memory_size=ByteSize.from_gb(30),
max_used_memory_ratio=0.8,
created_at=now - timedelta(seconds=30),
system_wide_used_size=None,
system_wide_memory_size=None,
),
MemorySnapshot(
current_size=ByteSize.from_gb(48),
max_memory_size=ByteSize.from_gb(60),
max_used_memory_ratio=0.8,
created_at=now,
system_wide_used_size=None,
system_wide_memory_size=None,
),
]
)
Expand Down Expand Up @@ -204,3 +212,42 @@ def test_client_overloaded(

# Ratio of overloaded snapshots is 2/3 (2 minutes out of 3)
assert system_status._is_client_overloaded().is_overloaded == is_overloaded


def test_memory_overloaded_system_wide(snapshotter: Snapshotter, now: datetime) -> None:
"""Test that system-wide memory overload is detected when system-wide memory utilization exceeds threshold."""
system_status = SystemStatus(
snapshotter,
max_snapshot_age=timedelta(minutes=1),
memory_overload_threshold=0.5, # Set high threshold so process memory won't trigger overload
)

# Add memory snapshots with system-wide memory usage above threshold (97%)
system_status._snapshotter._memory_snapshots = Snapshotter._get_sorted_list_by_created_at(
[
MemorySnapshot(
current_size=ByteSize.from_gb(1), # Process memory is low
max_memory_size=ByteSize.from_gb(8), # Max memory is high
max_used_memory_ratio=0.8, # Ratio is fine
created_at=now - timedelta(minutes=1),
system_wide_used_size=ByteSize.from_gb(31), # System-wide used is high
system_wide_memory_size=ByteSize.from_gb(32), # System-wide total (31/32 = 96.875% < 97%)
),
MemorySnapshot(
current_size=ByteSize.from_gb(1), # Process memory is low
max_memory_size=ByteSize.from_gb(8), # Max memory is high
max_used_memory_ratio=0.8, # Ratio is fine
created_at=now,
system_wide_used_size=ByteSize.from_gb(31.5), # System-wide used is high
system_wide_memory_size=ByteSize.from_gb(32), # System-wide total (31.5/32 = 98.4% > 97%)
),
]
)

memory_info = system_status._is_memory_overloaded()

# Should be overloaded due to system-wide memory usage exceeding 97% threshold
assert memory_info.is_overloaded is True
# The actual ratio should be 1.0 (the entire time period from first to second snapshot is overloaded)
assert memory_info.actual_ratio == 1.0
assert memory_info.limit_ratio == 0.5
2 changes: 1 addition & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading