diff --git a/src/crawlee/memory_storage_client/_request_queue_client.py b/src/crawlee/memory_storage_client/_request_queue_client.py index 686b11de3b..c5b60b9c21 100644 --- a/src/crawlee/memory_storage_client/_request_queue_client.py +++ b/src/crawlee/memory_storage_client/_request_queue_client.py @@ -73,6 +73,8 @@ def __init__( self.file_operation_lock = asyncio.Lock() self._last_used_timestamp = Decimal(0.0) + self._in_progress = set[str]() + @property def resource_info(self) -> RequestQueueMetadata: """Get the resource info for the request queue client.""" @@ -171,7 +173,7 @@ async def delete(self) -> None: await asyncio.to_thread(shutil.rmtree, queue.resource_directory) @override - async def list_head(self, *, limit: int | None = None) -> RequestQueueHead: + async def list_head(self, *, limit: int | None = None, skip_in_progress: bool = False) -> RequestQueueHead: existing_queue_by_id = find_or_create_client_by_id_or_name_inner( resource_client_class=RequestQueueClient, memory_storage_client=self._memory_storage_client, @@ -196,6 +198,9 @@ async def list_head(self, *, limit: int | None = None) -> RequestQueueHead: if len(requests) == limit: break + if skip_in_progress and request_key in existing_queue_by_id._in_progress: # noqa: SLF001 + continue + request = existing_queue_by_id.requests.get(request_key) # Check that the request still exists and was not handled, @@ -214,7 +219,21 @@ async def list_head(self, *, limit: int | None = None) -> RequestQueueHead: @override async def list_and_lock_head(self, *, lock_secs: int, limit: int | None = None) -> RequestQueueHeadWithLocks: - result = await self.list_head(limit=limit) + existing_queue_by_id = find_or_create_client_by_id_or_name_inner( + resource_client_class=RequestQueueClient, + memory_storage_client=self._memory_storage_client, + id=self.id, + name=self.name, + ) + + if existing_queue_by_id is None: + raise_on_non_existing_storage(StorageTypes.REQUEST_QUEUE, self.id) + + result = await self.list_head(limit=limit, skip_in_progress=True) + + for item in result.items: + existing_queue_by_id._in_progress.add(item.id) # noqa: SLF001 + return RequestQueueHeadWithLocks( lock_secs=lock_secs, limit=result.limit, @@ -344,6 +363,9 @@ async def update_request( persist_storage=self._memory_storage_client.persist_storage, ) + if request.handled_at is not None: + existing_queue_by_id._in_progress.discard(request.id) # noqa: SLF001 + return ProcessedRequest( id=request_model.id, unique_key=request_model.unique_key, @@ -395,7 +417,17 @@ async def delete_request_lock( *, forefront: bool = False, ) -> None: - return None + existing_queue_by_id = find_or_create_client_by_id_or_name_inner( + resource_client_class=RequestQueueClient, + memory_storage_client=self._memory_storage_client, + id=self.id, + name=self.name, + ) + + if existing_queue_by_id is None: + raise_on_non_existing_storage(StorageTypes.REQUEST_QUEUE, self.id) + + existing_queue_by_id._in_progress.discard(request_id) # noqa: SLF001 @override async def batch_add_requests( diff --git a/src/crawlee/storages/_request_queue.py b/src/crawlee/storages/_request_queue.py index e05f88d288..decebcb5ab 100644 --- a/src/crawlee/storages/_request_queue.py +++ b/src/crawlee/storages/_request_queue.py @@ -5,7 +5,7 @@ from contextlib import suppress from datetime import datetime, timedelta, timezone from logging import getLogger -from typing import TYPE_CHECKING, Any, Generic, TypedDict, TypeVar +from typing import TYPE_CHECKING, Any, TypedDict, TypeVar from typing_extensions import override @@ -32,30 +32,6 @@ T = TypeVar('T') -class BoundedSet(Generic[T]): - """A simple set datastructure that removes the least recently accessed item when it reaches `max_length`.""" - - def __init__(self, max_length: int) -> None: - self._max_length = max_length - self._data = OrderedDict[T, object]() - - def __contains__(self, item: T) -> bool: - found = item in self._data - if found: - self._data.move_to_end(item, last=True) - return found - - def add(self, item: T) -> None: - self._data[item] = True - self._data.move_to_end(item) - - if len(self._data) > self._max_length: - self._data.popitem(last=False) - - def clear(self) -> None: - self._data.clear() - - class CachedRequest(TypedDict): id: str was_already_handled: bool @@ -99,12 +75,6 @@ class RequestQueue(BaseStorage, RequestProvider): _MAX_CACHED_REQUESTS = 1_000_000 """Maximum number of requests that can be cached.""" - _RECENTLY_HANDLED_CACHE_SIZE = 1000 - """Cache size for recently handled requests.""" - - _STORAGE_CONSISTENCY_DELAY = timedelta(seconds=3) - """Expected delay for storage to achieve consistency, guiding the timing of subsequent read operations.""" - def __init__( self, id: str, @@ -119,7 +89,6 @@ def __init__( # Get resource clients from storage client self._resource_client = client.request_queue(self._id) - self._resource_collection_client = client.request_queues() self._request_lock_time = timedelta(minutes=3) self._queue_paused_for_migration = False @@ -136,9 +105,7 @@ def __init__( self._assumed_handled_count = 0 self._queue_head_dict: OrderedDict[str, str] = OrderedDict() self._list_head_and_lock_task: asyncio.Task | None = None - self._in_progress: set[str] = set() self._last_activity = datetime.now(timezone.utc) - self._recently_handled: BoundedSet[str] = BoundedSet(max_length=self._RECENTLY_HANDLED_CACHE_SIZE) self._requests_cache: LRUCache[CachedRequest] = LRUCache(max_length=self._MAX_CACHED_REQUESTS) @override @@ -211,15 +178,7 @@ async def add_request( self._cache_request(cache_key, processed_request) - request_id, was_already_present = processed_request.id, processed_request.was_already_present - is_handled = request.handled_at is not None - - if ( - not is_handled - and not was_already_present - and request_id not in self._in_progress - and request_id not in self._recently_handled - ): + if request.handled_at is None and not processed_request.was_already_present: self._assumed_total_count += 1 return processed_request @@ -302,27 +261,7 @@ async def fetch_next_request(self) -> Request | None: return None next_request_id, _ = self._queue_head_dict.popitem(last=False) # ~removeFirst() - - # This should never happen, but... - if next_request_id in self._in_progress or next_request_id in self._recently_handled: - logger.warning( - 'Queue head returned a request that is already in progress?!', - extra={ - 'nextRequestId': next_request_id, - 'inProgress': next_request_id in self._in_progress, - 'recentlyHandled': next_request_id in self._recently_handled, - }, - ) - return None - - self._in_progress.add(next_request_id) - - try: - request = await self._get_or_hydrate_request(next_request_id) - except Exception: - # On error, remove the request from in progress, otherwise it would be there forever - self._in_progress.remove(next_request_id) - raise + request = await self._get_or_hydrate_request(next_request_id) # NOTE: It can happen that the queue head index is inconsistent with the main queue table. # This can occur in two situations: @@ -338,10 +277,6 @@ async def fetch_next_request(self) -> Request | None: 'Cannot find a request from the beginning of queue, will be retried later', extra={'nextRequestId': next_request_id}, ) - asyncio.get_running_loop().call_later( - self._STORAGE_CONSISTENCY_DELAY.total_seconds(), - lambda: self._in_progress.remove(next_request_id), - ) return None # 2) @@ -354,7 +289,6 @@ async def fetch_next_request(self) -> Request | None: 'Request fetched from the beginning of queue was already handled', extra={'nextRequestId': next_request_id}, ) - self._recently_handled.add(next_request_id) return None return request @@ -372,19 +306,12 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | """ self._last_activity = datetime.now(timezone.utc) - if request.id not in self._in_progress: - logger.debug(f'Cannot mark request (ID: {request.id}) as handled, because it is not in progress!') - return None - if request.handled_at is None: request.handled_at = datetime.now(timezone.utc) processed_request = await self._resource_client.update_request(request) processed_request.unique_key = request.unique_key - self._in_progress.remove(request.id) - self._recently_handled.add(request.id) - if not processed_request.was_already_handled: self._assumed_handled_count += 1 @@ -410,10 +337,6 @@ async def reclaim_request( """ self._last_activity = datetime.now(timezone.utc) - if request.id not in self._in_progress: - logger.debug(f'Cannot reclaim request (ID: {request.id}), because it is not in progress!') - return None - # TODO: If request hasn't been changed since the last get_request(), we don't need to call update_request() # and thus improve performance. # https://github.com/apify/apify-sdk-python/issues/143 @@ -422,11 +345,6 @@ async def reclaim_request( self._cache_request(unique_key_to_request_id(request.unique_key), processed_request) if processed_request: - # Mark the request as no longer in progress, - # as the moment we delete the lock, we could end up also re-fetching the request in a subsequent - # _ensure_head_is_non_empty() which could potentially lock the request again - self._in_progress.discard(request.id) - # Try to delete the request lock if possible try: await self._resource_client.delete_request_lock(request.id, forefront=forefront) @@ -453,21 +371,6 @@ async def is_finished(self) -> bool: Returns: bool: `True` if all requests were already handled and there are no more left. `False` otherwise. """ - seconds_since_last_activity = datetime.now(timezone.utc) - self._last_activity - if self._in_progress_count() > 0 and seconds_since_last_activity > self._internal_timeout: - logger.warning( - f'The request queue seems to be stuck for {self._internal_timeout.total_seconds()}s, ' - 'resetting internal state.', - extra={ - 'queue_head_ids_pending': len(self._queue_head_dict), - 'in_progress': list(self._in_progress), - }, - ) - - # We only need to reset these two variables, no need to reset all the other stats - self._queue_head_dict.clear() - self._in_progress.clear() - if self._queue_head_dict: logger.debug( 'There are still ids in the queue head that are pending processing', @@ -478,24 +381,28 @@ async def is_finished(self) -> bool: return False - if self._in_progress: + await self._ensure_head_is_non_empty() + + if self._queue_head_dict: logger.debug( - 'There are still requests in progress (or zombie)', - extra={ - 'in_progress': list(self._in_progress), - }, + 'Queue head still returned requests that need to be processed (or that are locked by other clients)' ) return False + metadata = await self._resource_client.get() + if metadata is not None and not metadata.had_multiple_clients and not self._queue_head_dict: + logger.debug('Queue head is empty and there are no other clients - we are finished') + + return True + current_head = await self._resource_client.list_head(limit=2) - if current_head.items: - logger.debug( - 'Queue head still returned requests that need to be processed (or that are locked by other clients)', - ) + if not current_head.items: + await asyncio.sleep(5) # TODO not sure how long we should sleep + current_head = await self._resource_client.list_head(limit=2) - return not current_head.items and not self._in_progress + return len(current_head.items) == 0 async def get_info(self) -> RequestQueueMetadata | None: """Get an object containing general information about the request queue.""" @@ -536,19 +443,12 @@ async def _list_head_and_lock(self) -> None: for request in response.items: # Queue head index might be behind the main table, so ensure we don't recycle requests - if ( - not request.id - or not request.unique_key - or request.id in self._in_progress - or request.id in self._recently_handled - ): + if not request.id or not request.unique_key: logger.debug( 'Skipping request from queue head, already in progress or recently handled', extra={ 'id': request.id, 'unique_key': request.unique_key, - 'in_progress': request.id in self._in_progress, - 'recently_handled': request.id in self._recently_handled, }, ) @@ -570,14 +470,9 @@ async def _list_head_and_lock(self) -> None: ), ) - def _in_progress_count(self) -> int: - return len(self._in_progress) - def _reset(self) -> None: self._queue_head_dict.clear() self._list_head_and_lock_task = None - self._in_progress.clear() - self._recently_handled.clear() self._assumed_total_count = 0 self._assumed_handled_count = 0 self._requests_cache.clear()