diff --git a/src/crawlee/crawlers/_basic/_basic_crawler.py b/src/crawlee/crawlers/_basic/_basic_crawler.py index b5e2e5f0aa..74d2aaff13 100644 --- a/src/crawlee/crawlers/_basic/_basic_crawler.py +++ b/src/crawlee/crawlers/_basic/_basic_crawler.py @@ -56,7 +56,7 @@ SessionError, UserDefinedErrorHandlerError, ) -from crawlee.events._types import Event, EventCrawlerStatusData, EventPersistStateData +from crawlee.events._types import Event, EventCrawlerStatusData from crawlee.http_clients import ImpitHttpClient from crawlee.router import Router from crawlee.sessions import SessionPool @@ -751,9 +751,6 @@ async def _run_crawler(self) -> None: await self._autoscaled_pool.run() - # Emit PERSIST_STATE event when crawler is finishing to allow listeners to persist their state if needed - event_manager.emit(event=Event.PERSIST_STATE, event_data=EventPersistStateData(is_migrating=False)) - async def add_requests( self, requests: Sequence[str | Request], diff --git a/src/crawlee/events/_event_manager.py b/src/crawlee/events/_event_manager.py index 4d6936e71d..65a41dd0d0 100644 --- a/src/crawlee/events/_event_manager.py +++ b/src/crawlee/events/_event_manager.py @@ -130,11 +130,13 @@ async def __aexit__( if not self._active: raise RuntimeError(f'The {self.__class__.__name__} is not active.') + # Stop persist state event periodic emission and manually emit last one to ensure latest state is saved. + await self._emit_persist_state_event_rec_task.stop() + await self._emit_persist_state_event() await self.wait_for_all_listeners_to_complete(timeout=self._close_timeout) self._event_emitter.remove_all_listeners() self._listener_tasks.clear() self._listeners_to_wrappers.clear() - await self._emit_persist_state_event_rec_task.stop() self._active = False @overload diff --git a/src/crawlee/statistics/_models.py b/src/crawlee/statistics/_models.py index 4810b0511b..11b4310f3a 100644 --- a/src/crawlee/statistics/_models.py +++ b/src/crawlee/statistics/_models.py @@ -1,6 +1,7 @@ from __future__ import annotations import json +import warnings from dataclasses import asdict, dataclass from datetime import datetime, timedelta, timezone from typing import Annotated, Any @@ -76,7 +77,6 @@ class StatisticsState(BaseModel): crawler_started_at: Annotated[datetime | None, Field(alias='crawlerStartedAt')] = None crawler_last_started_at: Annotated[datetime | None, Field(alias='crawlerLastStartTimestamp')] = None crawler_finished_at: Annotated[datetime | None, Field(alias='crawlerFinishedAt')] = None - crawler_runtime: Annotated[timedelta_ms, Field(alias='crawlerRuntimeMillis')] = timedelta() errors: dict[str, Any] = Field(default_factory=dict) retry_errors: dict[str, Any] = Field(alias='retryErrors', default_factory=dict) requests_with_status_code: dict[str, int] = Field(alias='requestsWithStatusCode', default_factory=dict) @@ -93,6 +93,37 @@ class StatisticsState(BaseModel): ), ] = {} + # Used to track the crawler runtime, that had already been persisted. This is the runtime from previous runs. + _runtime_offset: Annotated[timedelta, Field(exclude=True)] = timedelta() + + def model_post_init(self, /, __context: Any) -> None: + self._runtime_offset = self.crawler_runtime or self._runtime_offset + + @property + def crawler_runtime(self) -> timedelta: + if self.crawler_last_started_at: + finished_at = self.crawler_finished_at or datetime.now(timezone.utc) + return self._runtime_offset + finished_at - self.crawler_last_started_at + return self._runtime_offset + + @crawler_runtime.setter + def crawler_runtime(self, value: timedelta) -> None: + # Setter for backwards compatibility only, the crawler_runtime is now computed_field, and cant be set manually. + # To be removed in v2 release https://github.com/apify/crawlee-python/issues/1567 + warnings.warn( + f"Setting 'crawler_runtime' is deprecated and will be removed in a future version." + f' Value {value} will not be used.', + DeprecationWarning, + stacklevel=2, + ) + + @computed_field(alias='crawlerRuntimeMillis') + def crawler_runtime_for_serialization(self) -> timedelta: + if self.crawler_last_started_at: + finished_at = self.crawler_finished_at or datetime.now(timezone.utc) + return self._runtime_offset + finished_at - self.crawler_last_started_at + return self._runtime_offset + @computed_field(alias='requestTotalDurationMillis', return_type=timedelta_ms) # type: ignore[prop-decorator] @property def request_total_duration(self) -> timedelta: diff --git a/src/crawlee/statistics/_statistics.py b/src/crawlee/statistics/_statistics.py index 667b96eebe..51735b0056 100644 --- a/src/crawlee/statistics/_statistics.py +++ b/src/crawlee/statistics/_statistics.py @@ -110,9 +110,6 @@ def __init__( # Flag to indicate the context state. self._active = False - # Pre-existing runtime offset, that can be non-zero when restoring serialized state from KVS. - self._runtime_offset = timedelta(seconds=0) - def replace_state_model(self, state_model: type[TNewStatisticsState]) -> Statistics[TNewStatisticsState]: """Create near copy of the `Statistics` with replaced `state_model`.""" new_statistics: Statistics[TNewStatisticsState] = Statistics( @@ -168,8 +165,8 @@ async def __aenter__(self) -> Self: raise RuntimeError(f'The {self.__class__.__name__} is already active.') await self._state.initialize() - - self._runtime_offset = self.state.crawler_runtime + # Reset `crawler_finished_at` to indicate a new run in progress. + self.state.crawler_finished_at = None # Start periodic logging and let it print initial state before activation. self._periodic_logger.start() @@ -200,10 +197,6 @@ async def __aexit__( # Stop logging and deactivate the statistics to prevent further changes to crawler_runtime await self._periodic_logger.stop() self.state.crawler_finished_at = datetime.now(timezone.utc) - self.state.crawler_runtime = ( - self._runtime_offset + self.state.crawler_finished_at - self.state.crawler_last_started_at - ) - self._active = False await self._state.teardown() @@ -262,20 +255,8 @@ def record_request_processing_failure(self, request_id_or_key: str) -> None: del self._requests_in_progress[request_id_or_key] - def _update_crawler_runtime(self) -> None: - current_run_duration = ( - (datetime.now(timezone.utc) - self.state.crawler_last_started_at) - if self.state.crawler_last_started_at - else timedelta() - ) - self.state.crawler_runtime = current_run_duration + self._runtime_offset - def calculate(self) -> FinalStatistics: """Calculate the current statistics.""" - if self._active: - # Only update state when active. If not, just report the last known runtime. - self._update_crawler_runtime() - total_minutes = self.state.crawler_runtime.total_seconds() / 60 state = self._state.current_value serialized_state = state.model_dump(by_alias=False) diff --git a/tests/unit/crawlers/_basic/test_basic_crawler.py b/tests/unit/crawlers/_basic/test_basic_crawler.py index 4cbed2c4e2..c7dad2725c 100644 --- a/tests/unit/crawlers/_basic/test_basic_crawler.py +++ b/tests/unit/crawlers/_basic/test_basic_crawler.py @@ -1673,7 +1673,6 @@ def _process_run_crawler(requests: list[str], storage_dir: str) -> StatisticsSta return asyncio.run(_run_crawler(requests=requests, storage_dir=storage_dir)) -@pytest.mark.skip(reason='This test is flaky, see https://github.com/apify/crawlee-python/issues/1560.') async def test_crawler_statistics_persistence(tmp_path: Path) -> None: """Test that crawler statistics persist and are loaded correctly. diff --git a/tests/unit/events/test_event_manager.py b/tests/unit/events/test_event_manager.py index 05ac60a52c..4654efaf64 100644 --- a/tests/unit/events/test_event_manager.py +++ b/tests/unit/events/test_event_manager.py @@ -5,6 +5,7 @@ from datetime import timedelta from functools import update_wrapper from typing import TYPE_CHECKING, Any +from unittest import mock from unittest.mock import AsyncMock, MagicMock import pytest @@ -207,3 +208,14 @@ async def test_methods_raise_error_when_not_active(event_system_info_data: Event await event_manager.wait_for_all_listeners_to_complete() assert event_manager.active is True + + +async def test_event_manager_in_context_persistence() -> None: + """Test that entering the `EventManager` context emits persist state event at least once.""" + event_manager = EventManager() + + with mock.patch.object(event_manager, '_emit_persist_state_event', AsyncMock()) as mocked_emit_persist_state_event: + async with event_manager: + pass + + assert mocked_emit_persist_state_event.call_count >= 1