Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions src/crawlee/crawlers/_basic/_basic_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
SessionError,
UserDefinedErrorHandlerError,
)
from crawlee.events._types import Event, EventCrawlerStatusData, EventPersistStateData
from crawlee.events._types import Event, EventCrawlerStatusData
from crawlee.http_clients import ImpitHttpClient
from crawlee.router import Router
from crawlee.sessions import SessionPool
Expand Down Expand Up @@ -751,9 +751,6 @@ async def _run_crawler(self) -> None:

await self._autoscaled_pool.run()

# Emit PERSIST_STATE event when crawler is finishing to allow listeners to persist their state if needed
event_manager.emit(event=Event.PERSIST_STATE, event_data=EventPersistStateData(is_migrating=False))

async def add_requests(
self,
requests: Sequence[str | Request],
Expand Down
4 changes: 3 additions & 1 deletion src/crawlee/events/_event_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,13 @@ async def __aexit__(
if not self._active:
raise RuntimeError(f'The {self.__class__.__name__} is not active.')

# Stop persist state event periodic emission and manually emit last one to ensure latest state is saved.
await self._emit_persist_state_event_rec_task.stop()
await self._emit_persist_state_event()
await self.wait_for_all_listeners_to_complete(timeout=self._close_timeout)
self._event_emitter.remove_all_listeners()
self._listener_tasks.clear()
self._listeners_to_wrappers.clear()
await self._emit_persist_state_event_rec_task.stop()
self._active = False

@overload
Expand Down
33 changes: 32 additions & 1 deletion src/crawlee/statistics/_models.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import json
import warnings
from dataclasses import asdict, dataclass
from datetime import datetime, timedelta, timezone
from typing import Annotated, Any
Expand Down Expand Up @@ -76,7 +77,6 @@ class StatisticsState(BaseModel):
crawler_started_at: Annotated[datetime | None, Field(alias='crawlerStartedAt')] = None
crawler_last_started_at: Annotated[datetime | None, Field(alias='crawlerLastStartTimestamp')] = None
crawler_finished_at: Annotated[datetime | None, Field(alias='crawlerFinishedAt')] = None
crawler_runtime: Annotated[timedelta_ms, Field(alias='crawlerRuntimeMillis')] = timedelta()
errors: dict[str, Any] = Field(default_factory=dict)
retry_errors: dict[str, Any] = Field(alias='retryErrors', default_factory=dict)
requests_with_status_code: dict[str, int] = Field(alias='requestsWithStatusCode', default_factory=dict)
Expand All @@ -93,6 +93,37 @@ class StatisticsState(BaseModel):
),
] = {}

# Used to track the crawler runtime, that had already been persisted. This is the runtime from previous runs.
_runtime_offset: Annotated[timedelta, Field(exclude=True)] = timedelta()

def model_post_init(self, /, __context: Any) -> None:
self._runtime_offset = self.crawler_runtime or self._runtime_offset

@property
def crawler_runtime(self) -> timedelta:
if self.crawler_last_started_at:
finished_at = self.crawler_finished_at or datetime.now(timezone.utc)
return self._runtime_offset + finished_at - self.crawler_last_started_at
return self._runtime_offset

@crawler_runtime.setter
def crawler_runtime(self, value: timedelta) -> None:
# Setter for backwards compatibility only, the crawler_runtime is now computed_field, and cant be set manually.
# To be removed in v2 release https://github.com/apify/crawlee-python/issues/1567
warnings.warn(
f"Setting 'crawler_runtime' is deprecated and will be removed in a future version."
f' Value {value} will not be used.',
DeprecationWarning,
stacklevel=2,
)

@computed_field(alias='crawlerRuntimeMillis')
def crawler_runtime_for_serialization(self) -> timedelta:
if self.crawler_last_started_at:
finished_at = self.crawler_finished_at or datetime.now(timezone.utc)
return self._runtime_offset + finished_at - self.crawler_last_started_at
return self._runtime_offset

@computed_field(alias='requestTotalDurationMillis', return_type=timedelta_ms) # type: ignore[prop-decorator]
@property
def request_total_duration(self) -> timedelta:
Expand Down
23 changes: 2 additions & 21 deletions src/crawlee/statistics/_statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,6 @@ def __init__(
# Flag to indicate the context state.
self._active = False

# Pre-existing runtime offset, that can be non-zero when restoring serialized state from KVS.
self._runtime_offset = timedelta(seconds=0)

def replace_state_model(self, state_model: type[TNewStatisticsState]) -> Statistics[TNewStatisticsState]:
"""Create near copy of the `Statistics` with replaced `state_model`."""
new_statistics: Statistics[TNewStatisticsState] = Statistics(
Expand Down Expand Up @@ -168,8 +165,8 @@ async def __aenter__(self) -> Self:
raise RuntimeError(f'The {self.__class__.__name__} is already active.')

await self._state.initialize()

self._runtime_offset = self.state.crawler_runtime
# Reset `crawler_finished_at` to indicate a new run in progress.
self.state.crawler_finished_at = None

# Start periodic logging and let it print initial state before activation.
self._periodic_logger.start()
Expand Down Expand Up @@ -200,10 +197,6 @@ async def __aexit__(
# Stop logging and deactivate the statistics to prevent further changes to crawler_runtime
await self._periodic_logger.stop()
self.state.crawler_finished_at = datetime.now(timezone.utc)
self.state.crawler_runtime = (
self._runtime_offset + self.state.crawler_finished_at - self.state.crawler_last_started_at
)

self._active = False
await self._state.teardown()

Expand Down Expand Up @@ -262,20 +255,8 @@ def record_request_processing_failure(self, request_id_or_key: str) -> None:

del self._requests_in_progress[request_id_or_key]

def _update_crawler_runtime(self) -> None:
current_run_duration = (
(datetime.now(timezone.utc) - self.state.crawler_last_started_at)
if self.state.crawler_last_started_at
else timedelta()
)
self.state.crawler_runtime = current_run_duration + self._runtime_offset

def calculate(self) -> FinalStatistics:
"""Calculate the current statistics."""
if self._active:
# Only update state when active. If not, just report the last known runtime.
self._update_crawler_runtime()

total_minutes = self.state.crawler_runtime.total_seconds() / 60
state = self._state.current_value
serialized_state = state.model_dump(by_alias=False)
Expand Down
1 change: 0 additions & 1 deletion tests/unit/crawlers/_basic/test_basic_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1673,7 +1673,6 @@ def _process_run_crawler(requests: list[str], storage_dir: str) -> StatisticsSta
return asyncio.run(_run_crawler(requests=requests, storage_dir=storage_dir))


@pytest.mark.skip(reason='This test is flaky, see https://github.com/apify/crawlee-python/issues/1560.')
async def test_crawler_statistics_persistence(tmp_path: Path) -> None:
"""Test that crawler statistics persist and are loaded correctly.

Expand Down
12 changes: 12 additions & 0 deletions tests/unit/events/test_event_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from datetime import timedelta
from functools import update_wrapper
from typing import TYPE_CHECKING, Any
from unittest import mock
from unittest.mock import AsyncMock, MagicMock

import pytest
Expand Down Expand Up @@ -207,3 +208,14 @@ async def test_methods_raise_error_when_not_active(event_system_info_data: Event
await event_manager.wait_for_all_listeners_to_complete()

assert event_manager.active is True


async def test_event_manager_in_context_persistence() -> None:
"""Test that entering the `EventManager` context emits persist state event at least once."""
event_manager = EventManager()

with mock.patch.object(event_manager, '_emit_persist_state_event', AsyncMock()) as mocked_emit_persist_state_event:
async with event_manager:
pass

assert mocked_emit_persist_state_event.call_count >= 1