Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Simplify request queue implementation #653

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 35 additions & 3 deletions src/crawlee/memory_storage_client/_request_queue_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ def __init__(
self.file_operation_lock = asyncio.Lock()
self._last_used_timestamp = Decimal(0.0)

self._in_progress = set[str]()

@property
def resource_info(self) -> RequestQueueMetadata:
"""Get the resource info for the request queue client."""
Expand Down Expand Up @@ -171,7 +173,7 @@ async def delete(self) -> None:
await asyncio.to_thread(shutil.rmtree, queue.resource_directory)

@override
async def list_head(self, *, limit: int | None = None) -> RequestQueueHead:
async def list_head(self, *, limit: int | None = None, skip_in_progress: bool = False) -> RequestQueueHead:
existing_queue_by_id = find_or_create_client_by_id_or_name_inner(
resource_client_class=RequestQueueClient,
memory_storage_client=self._memory_storage_client,
Expand All @@ -196,6 +198,9 @@ async def list_head(self, *, limit: int | None = None) -> RequestQueueHead:
if len(requests) == limit:
break

if skip_in_progress and request_key in existing_queue_by_id._in_progress: # noqa: SLF001
continue

request = existing_queue_by_id.requests.get(request_key)

# Check that the request still exists and was not handled,
Expand All @@ -214,7 +219,21 @@ async def list_head(self, *, limit: int | None = None) -> RequestQueueHead:

@override
async def list_and_lock_head(self, *, lock_secs: int, limit: int | None = None) -> RequestQueueHeadWithLocks:
result = await self.list_head(limit=limit)
existing_queue_by_id = find_or_create_client_by_id_or_name_inner(
resource_client_class=RequestQueueClient,
memory_storage_client=self._memory_storage_client,
id=self.id,
name=self.name,
)

if existing_queue_by_id is None:
raise_on_non_existing_storage(StorageTypes.REQUEST_QUEUE, self.id)

result = await self.list_head(limit=limit, skip_in_progress=True)

for item in result.items:
existing_queue_by_id._in_progress.add(item.id) # noqa: SLF001

return RequestQueueHeadWithLocks(
lock_secs=lock_secs,
limit=result.limit,
Expand Down Expand Up @@ -344,6 +363,9 @@ async def update_request(
persist_storage=self._memory_storage_client.persist_storage,
)

if request.handled_at is not None:
existing_queue_by_id._in_progress.discard(request.id) # noqa: SLF001

return ProcessedRequest(
id=request_model.id,
unique_key=request_model.unique_key,
Expand Down Expand Up @@ -395,7 +417,17 @@ async def delete_request_lock(
*,
forefront: bool = False,
) -> None:
return None
existing_queue_by_id = find_or_create_client_by_id_or_name_inner(
resource_client_class=RequestQueueClient,
memory_storage_client=self._memory_storage_client,
id=self.id,
name=self.name,
)

if existing_queue_by_id is None:
raise_on_non_existing_storage(StorageTypes.REQUEST_QUEUE, self.id)

existing_queue_by_id._in_progress.discard(request_id) # noqa: SLF001

@override
async def batch_add_requests(
Expand Down
141 changes: 18 additions & 123 deletions src/crawlee/storages/_request_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from contextlib import suppress
from datetime import datetime, timedelta, timezone
from logging import getLogger
from typing import TYPE_CHECKING, Any, Generic, TypedDict, TypeVar
from typing import TYPE_CHECKING, Any, TypedDict, TypeVar

from typing_extensions import override

Expand All @@ -32,30 +32,6 @@
T = TypeVar('T')


class BoundedSet(Generic[T]):
"""A simple set datastructure that removes the least recently accessed item when it reaches `max_length`."""

def __init__(self, max_length: int) -> None:
self._max_length = max_length
self._data = OrderedDict[T, object]()

def __contains__(self, item: T) -> bool:
found = item in self._data
if found:
self._data.move_to_end(item, last=True)
return found

def add(self, item: T) -> None:
self._data[item] = True
self._data.move_to_end(item)

if len(self._data) > self._max_length:
self._data.popitem(last=False)

def clear(self) -> None:
self._data.clear()


class CachedRequest(TypedDict):
id: str
was_already_handled: bool
Expand Down Expand Up @@ -99,12 +75,6 @@ class RequestQueue(BaseStorage, RequestProvider):
_MAX_CACHED_REQUESTS = 1_000_000
"""Maximum number of requests that can be cached."""

_RECENTLY_HANDLED_CACHE_SIZE = 1000
"""Cache size for recently handled requests."""

_STORAGE_CONSISTENCY_DELAY = timedelta(seconds=3)
"""Expected delay for storage to achieve consistency, guiding the timing of subsequent read operations."""

def __init__(
self,
id: str,
Expand All @@ -119,7 +89,6 @@ def __init__(

# Get resource clients from storage client
self._resource_client = client.request_queue(self._id)
self._resource_collection_client = client.request_queues()

self._request_lock_time = timedelta(minutes=3)
self._queue_paused_for_migration = False
Expand All @@ -136,9 +105,7 @@ def __init__(
self._assumed_handled_count = 0
self._queue_head_dict: OrderedDict[str, str] = OrderedDict()
self._list_head_and_lock_task: asyncio.Task | None = None
self._in_progress: set[str] = set()
self._last_activity = datetime.now(timezone.utc)
self._recently_handled: BoundedSet[str] = BoundedSet(max_length=self._RECENTLY_HANDLED_CACHE_SIZE)
self._requests_cache: LRUCache[CachedRequest] = LRUCache(max_length=self._MAX_CACHED_REQUESTS)

@override
Expand Down Expand Up @@ -211,15 +178,7 @@ async def add_request(

self._cache_request(cache_key, processed_request)

request_id, was_already_present = processed_request.id, processed_request.was_already_present
is_handled = request.handled_at is not None

if (
not is_handled
and not was_already_present
and request_id not in self._in_progress
and request_id not in self._recently_handled
):
if request.handled_at is None and not processed_request.was_already_present:
self._assumed_total_count += 1

return processed_request
Expand Down Expand Up @@ -302,27 +261,7 @@ async def fetch_next_request(self) -> Request | None:
return None

next_request_id, _ = self._queue_head_dict.popitem(last=False) # ~removeFirst()

# This should never happen, but...
if next_request_id in self._in_progress or next_request_id in self._recently_handled:
logger.warning(
'Queue head returned a request that is already in progress?!',
extra={
'nextRequestId': next_request_id,
'inProgress': next_request_id in self._in_progress,
'recentlyHandled': next_request_id in self._recently_handled,
},
)
return None

self._in_progress.add(next_request_id)

try:
request = await self._get_or_hydrate_request(next_request_id)
except Exception:
# On error, remove the request from in progress, otherwise it would be there forever
self._in_progress.remove(next_request_id)
raise
request = await self._get_or_hydrate_request(next_request_id)

# NOTE: It can happen that the queue head index is inconsistent with the main queue table.
# This can occur in two situations:
Expand All @@ -338,10 +277,6 @@ async def fetch_next_request(self) -> Request | None:
'Cannot find a request from the beginning of queue, will be retried later',
extra={'nextRequestId': next_request_id},
)
asyncio.get_running_loop().call_later(
self._STORAGE_CONSISTENCY_DELAY.total_seconds(),
lambda: self._in_progress.remove(next_request_id),
)
return None

# 2)
Expand All @@ -354,7 +289,6 @@ async def fetch_next_request(self) -> Request | None:
'Request fetched from the beginning of queue was already handled',
extra={'nextRequestId': next_request_id},
)
self._recently_handled.add(next_request_id)
return None

return request
Expand All @@ -372,19 +306,12 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest |
"""
self._last_activity = datetime.now(timezone.utc)

if request.id not in self._in_progress:
logger.debug(f'Cannot mark request (ID: {request.id}) as handled, because it is not in progress!')
return None

if request.handled_at is None:
request.handled_at = datetime.now(timezone.utc)

processed_request = await self._resource_client.update_request(request)
processed_request.unique_key = request.unique_key

self._in_progress.remove(request.id)
self._recently_handled.add(request.id)

if not processed_request.was_already_handled:
self._assumed_handled_count += 1

Expand All @@ -410,10 +337,6 @@ async def reclaim_request(
"""
self._last_activity = datetime.now(timezone.utc)

if request.id not in self._in_progress:
logger.debug(f'Cannot reclaim request (ID: {request.id}), because it is not in progress!')
return None

# TODO: If request hasn't been changed since the last get_request(), we don't need to call update_request()
# and thus improve performance.
# https://github.com/apify/apify-sdk-python/issues/143
Expand All @@ -422,11 +345,6 @@ async def reclaim_request(
self._cache_request(unique_key_to_request_id(request.unique_key), processed_request)

if processed_request:
# Mark the request as no longer in progress,
# as the moment we delete the lock, we could end up also re-fetching the request in a subsequent
# _ensure_head_is_non_empty() which could potentially lock the request again
self._in_progress.discard(request.id)

# Try to delete the request lock if possible
try:
await self._resource_client.delete_request_lock(request.id, forefront=forefront)
Expand All @@ -453,21 +371,6 @@ async def is_finished(self) -> bool:
Returns:
bool: `True` if all requests were already handled and there are no more left. `False` otherwise.
"""
seconds_since_last_activity = datetime.now(timezone.utc) - self._last_activity
if self._in_progress_count() > 0 and seconds_since_last_activity > self._internal_timeout:
logger.warning(
f'The request queue seems to be stuck for {self._internal_timeout.total_seconds()}s, '
'resetting internal state.',
extra={
'queue_head_ids_pending': len(self._queue_head_dict),
'in_progress': list(self._in_progress),
},
)

# We only need to reset these two variables, no need to reset all the other stats
self._queue_head_dict.clear()
self._in_progress.clear()

if self._queue_head_dict:
logger.debug(
'There are still ids in the queue head that are pending processing',
Expand All @@ -478,24 +381,28 @@ async def is_finished(self) -> bool:

return False

if self._in_progress:
await self._ensure_head_is_non_empty()

if self._queue_head_dict:
logger.debug(
'There are still requests in progress (or zombie)',
extra={
'in_progress': list(self._in_progress),
},
'Queue head still returned requests that need to be processed (or that are locked by other clients)'
)

return False

metadata = await self._resource_client.get()
if metadata is not None and not metadata.had_multiple_clients and not self._queue_head_dict:
logger.debug('Queue head is empty and there are no other clients - we are finished')

return True

current_head = await self._resource_client.list_head(limit=2)

if current_head.items:
logger.debug(
'Queue head still returned requests that need to be processed (or that are locked by other clients)',
)
if not current_head.items:
await asyncio.sleep(5) # TODO not sure how long we should sleep
current_head = await self._resource_client.list_head(limit=2)

return not current_head.items and not self._in_progress
return len(current_head.items) == 0

async def get_info(self) -> RequestQueueMetadata | None:
"""Get an object containing general information about the request queue."""
Expand Down Expand Up @@ -536,19 +443,12 @@ async def _list_head_and_lock(self) -> None:

for request in response.items:
# Queue head index might be behind the main table, so ensure we don't recycle requests
if (
not request.id
or not request.unique_key
or request.id in self._in_progress
or request.id in self._recently_handled
):
if not request.id or not request.unique_key:
logger.debug(
'Skipping request from queue head, already in progress or recently handled',
extra={
'id': request.id,
'unique_key': request.unique_key,
'in_progress': request.id in self._in_progress,
'recently_handled': request.id in self._recently_handled,
},
)

Expand All @@ -570,14 +470,9 @@ async def _list_head_and_lock(self) -> None:
),
)

def _in_progress_count(self) -> int:
return len(self._in_progress)

def _reset(self) -> None:
self._queue_head_dict.clear()
self._list_head_and_lock_task = None
self._in_progress.clear()
self._recently_handled.clear()
self._assumed_total_count = 0
self._assumed_handled_count = 0
self._requests_cache.clear()
Expand Down
Loading