diff --git a/docs/guides/code_examples/session_management/multi_sessions_http.py b/docs/guides/code_examples/session_management/multi_sessions_http.py new file mode 100644 index 0000000000..aba4d5e9d0 --- /dev/null +++ b/docs/guides/code_examples/session_management/multi_sessions_http.py @@ -0,0 +1,85 @@ +import asyncio +from datetime import timedelta +from itertools import count +from typing import Callable + +from crawlee import ConcurrencySettings, Request +from crawlee.crawlers import BasicCrawlingContext, HttpCrawler, HttpCrawlingContext +from crawlee.errors import RequestCollisionError +from crawlee.sessions import Session, SessionPool + + +# Define a function for creating sessions with simple logic for unique `id` generation. +# This is necessary if you need to specify a particular session for the first request, +# for example during authentication +def create_session_function() -> Callable[[], Session]: + counter = count() + + def create_session() -> Session: + return Session( + id=str(next(counter)), + max_usage_count=999_999, + max_age=timedelta(hours=999_999), + max_error_score=100, + blocked_status_codes=[403], + ) + + return create_session + + +async def main() -> None: + crawler = HttpCrawler( + # Adjust request limits according to your pool size + concurrency_settings=ConcurrencySettings(max_tasks_per_minute=500), + # Requests are bound to specific sessions, no rotation needed + max_session_rotations=0, + session_pool=SessionPool( + max_pool_size=10, create_session_function=create_session_function() + ), + ) + + @crawler.router.default_handler + async def basic_handler(context: HttpCrawlingContext) -> None: + context.log.info(f'Processing {context.request.url}') + + # Initialize the session and bind the next request to this session if needed + @crawler.router.handler(label='session_init') + async def session_init(context: HttpCrawlingContext) -> None: + next_requests = [] + if context.session: + context.log.info(f'Init session {context.session.id}') + next_request = Request.from_url( + 'https://placeholder.dev', session_id=context.session.id + ) + next_requests.append(next_request) + + await context.add_requests(next_requests) + + # Handle errors when a session is blocked and no longer available in the pool + # when attempting to execute requests bound to it + @crawler.failed_request_handler + async def error_processing(context: BasicCrawlingContext, error: Exception) -> None: + if isinstance(error, RequestCollisionError) and context.session: + context.log.error( + f'Request {context.request.url} failed, because the bound ' + 'session is unavailable' + ) + + # Create a pool of requests bound to their respective sessions + # Use `always_enqueue=True` if session initialization happens on a non-unique address, + # such as the site's main page + init_requests = [ + Request.from_url( + 'https://example.org/', + label='session_init', + session_id=str(session_id), + use_extended_unique_key=True, + ) + for session_id in range(1, 11) + ] + + await crawler.run(init_requests) + + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/docs/guides/code_examples/session_management/one_session_http.py b/docs/guides/code_examples/session_management/one_session_http.py new file mode 100644 index 0000000000..28cec44b63 --- /dev/null +++ b/docs/guides/code_examples/session_management/one_session_http.py @@ -0,0 +1,56 @@ +import asyncio +from datetime import timedelta + +from crawlee import ConcurrencySettings, Request +from crawlee.crawlers import BasicCrawlingContext, HttpCrawler, HttpCrawlingContext +from crawlee.errors import SessionError +from crawlee.sessions import SessionPool + + +async def main() -> None: + crawler = HttpCrawler( + # Limit requests per minute to reduce the chance of being blocked + concurrency_settings=ConcurrencySettings(max_tasks_per_minute=50), + # Disable session rotation + max_session_rotations=0, + session_pool=SessionPool( + # Only one session in the pool + max_pool_size=1, + create_session_settings={ + # High value for session usage limit + 'max_usage_count': 999_999, + # High value for session lifetime + 'max_age': timedelta(hours=999_999), + # High score allows the session to encounter more errors + # before crawlee decides the session is blocked + # Make sure you know how to handle these errors + 'max_error_score': 100, + # 403 status usually indicates you're already blocked + 'blocked_status_codes': [403], + }, + ), + ) + + # Basic request handling logic + @crawler.router.default_handler + async def basic_handler(context: HttpCrawlingContext) -> None: + context.log.info(f'Processing {context.request.url}') + + # Handler for session initialization (authentication, initial cookies, etc.) + @crawler.router.handler(label='session_init') + async def session_init(context: HttpCrawlingContext) -> None: + if context.session: + context.log.info(f'Init session {context.session.id}') + + # Monitor if our session gets blocked and explicitly stop the crawler + @crawler.error_handler + async def error_processing(context: BasicCrawlingContext, error: Exception) -> None: + if isinstance(error, SessionError) and context.session: + context.log.info(f'Session {context.session.id} blocked') + crawler.stop() + + await crawler.run([Request.from_url('https://example.org/', label='session_init')]) + + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/docs/guides/session_management.mdx b/docs/guides/session_management.mdx index f172761204..20b4dfd1b2 100644 --- a/docs/guides/session_management.mdx +++ b/docs/guides/session_management.mdx @@ -15,6 +15,8 @@ import BeautifulSoupSource from '!!raw-loader!./code_examples/session_management import ParselSource from '!!raw-loader!./code_examples/session_management/session_management_parsel.py'; import PlaywrightSource from '!!raw-loader!./code_examples/session_management/session_management_playwright.py'; import StandaloneSource from '!!raw-loader!./code_examples/session_management/session_management_standalone.py'; +import OneSession from '!!raw-loader!./code_examples/session_management/one_session_http.py'; +import MultiSessions from '!!raw-loader!./code_examples/session_management/multi_sessions_http.py'; The `SessionPool` class provides a robust way to manage the rotation of proxy IP addresses, cookies, and other custom settings in Crawlee. Its primary advantage is the ability to filter out blocked or non-functional proxies, ensuring that your scraper avoids retrying requests through known problematic proxies. @@ -68,3 +70,25 @@ Now, let's explore examples of how to use the `S These examples demonstrate the basics of configuring and using the `SessionPool`. Please, bear in mind that `SessionPool` requires some time to establish a stable pool of working IPs. During the initial setup, you may encounter errors as the pool identifies and filters out blocked or non-functional IPs. This stabilization period is expected and will improve over time. + +## Configuring a single session + +In some cases, you need full control over session usage. For example, when working with websites requiring authentication or initialization of certain parameters like cookies. + +When working with a site that requires authentication, we typically don't want multiple sessions with different browser fingerprints or client parameters accessing the site. In this case, we need to configure the `SessionPool` appropriately: + + + {OneSession} + + +## Binding requests to specific sessions + +In the previous example, there's one obvious limitation - you're restricted to only one session. + +In some cases, we need to achieve the same behavior but using multiple sessions in parallel, such as authenticating with different profiles or using different proxies. + +To do this, use the `session_id` parameter for the `Request` object to bind a request to a specific session: + + + {MultiSessions} + diff --git a/src/crawlee/_request.py b/src/crawlee/_request.py index d7deba4ff5..adb43949ea 100644 --- a/src/crawlee/_request.py +++ b/src/crawlee/_request.py @@ -58,6 +58,9 @@ class CrawleeRequestData(BaseModel): crawl_depth: Annotated[int, Field(alias='crawlDepth')] = 0 """The depth of the request in the crawl tree.""" + session_id: Annotated[str | None, Field()] = None + """ID of a session to which the request is bound.""" + class UserData(BaseModel, MutableMapping[str, JsonSerializable]): """Represents the `user_data` part of a Request. @@ -84,6 +87,7 @@ def __setitem__(self, key: str, value: JsonSerializable) -> None: raise ValueError('`label` must be str or None') self.label = value + self.__pydantic_extra__[key] = value def __delitem__(self, key: str) -> None: @@ -119,6 +123,7 @@ class RequestOptions(TypedDict): headers: NotRequired[HttpHeaders | dict[str, str] | None] payload: NotRequired[HttpPayload | str | None] label: NotRequired[str | None] + session_id: NotRequired[str | None] unique_key: NotRequired[str | None] id: NotRequired[str | None] keep_url_fragment: NotRequired[bool] @@ -227,6 +232,7 @@ def from_url( headers: HttpHeaders | dict[str, str] | None = None, payload: HttpPayload | str | None = None, label: str | None = None, + session_id: str | None = None, unique_key: str | None = None, id: str | None = None, keep_url_fragment: bool = False, @@ -248,14 +254,17 @@ def from_url( payload: The data to be sent as the request body. Typically used with 'POST' or 'PUT' requests. label: A custom label to differentiate between request types. This is stored in `user_data`, and it is used for request routing (different requests go to different handlers). + session_id: ID of a specific `Session` to which the request will be strictly bound. + If the session becomes unavailable when the request is processed, a `RequestCollisionError` will be + raised. unique_key: A unique key identifying the request. If not provided, it is automatically computed based on the URL and other parameters. Requests with the same `unique_key` are treated as identical. id: A unique identifier for the request. If not provided, it is automatically generated from the `unique_key`. keep_url_fragment: Determines whether the URL fragment (e.g., `#section`) should be included in the `unique_key` computation. This is only relevant when `unique_key` is not provided. - use_extended_unique_key: Determines whether to include the HTTP method and payload in the `unique_key` - computation. This is only relevant when `unique_key` is not provided. + use_extended_unique_key: Determines whether to include the HTTP method, ID Session and payload in the + `unique_key` computation. This is only relevant when `unique_key` is not provided. always_enqueue: If set to `True`, the request will be enqueued even if it is already present in the queue. Using this is not allowed when a custom `unique_key` is also provided and will result in a `ValueError`. **kwargs: Additional request properties. @@ -274,6 +283,7 @@ def from_url( method=method, headers=headers, payload=payload, + session_id=session_id, keep_url_fragment=keep_url_fragment, use_extended_unique_key=use_extended_unique_key, ) @@ -296,6 +306,9 @@ def from_url( if label is not None: request.user_data['label'] = label + if session_id is not None: + request.crawlee_data.session_id = session_id + return request def get_query_param_from_url(self, param: str, *, default: str | None = None) -> str | None: @@ -308,6 +321,11 @@ def label(self) -> str | None: """A string used to differentiate between arbitrary request types.""" return cast('UserData', self.user_data).label + @property + def session_id(self) -> str | None: + """The ID of the bound session, if there is any.""" + return self.crawlee_data.session_id + @property def crawlee_data(self) -> CrawleeRequestData: """Crawlee-specific configuration stored in the `user_data`.""" diff --git a/src/crawlee/_utils/requests.py b/src/crawlee/_utils/requests.py index a3d370cce6..a3b152a51d 100644 --- a/src/crawlee/_utils/requests.py +++ b/src/crawlee/_utils/requests.py @@ -78,6 +78,7 @@ def compute_unique_key( method: HttpMethod = 'GET', headers: HttpHeaders | None = None, payload: HttpPayload | None = None, + session_id: str | None = None, *, keep_url_fragment: bool = False, use_extended_unique_key: bool = False, @@ -96,6 +97,7 @@ def compute_unique_key( payload: The data to be sent as the request body. keep_url_fragment: A flag indicating whether to keep the URL fragment. use_extended_unique_key: A flag indicating whether to include a hashed payload in the key. + session_id: The ID of a specific `Session` to which the request will be strictly bound Returns: A string representing the unique key for the request. @@ -114,9 +116,13 @@ def compute_unique_key( if use_extended_unique_key: payload_hash = _get_payload_hash(payload) headers_hash = _get_headers_hash(headers) + normalized_session = '' if session_id is None else session_id.lower() # Return the extended unique key. Use pipe as a separator of the different parts of the unique key. - return f'{normalized_method}|{headers_hash}|{payload_hash}|{normalized_url}' + extended_part = f'{normalized_method}|{headers_hash}|{payload_hash}' + if normalized_session: + extended_part = f'{extended_part}|{normalized_session}' + return f'{extended_part}|{normalized_url}' # Log information if there is a non-GET request with a payload. if normalized_method != 'GET' and payload: diff --git a/src/crawlee/crawlers/_basic/_basic_crawler.py b/src/crawlee/crawlers/_basic/_basic_crawler.py index 9e805f7ec6..8c76da798e 100644 --- a/src/crawlee/crawlers/_basic/_basic_crawler.py +++ b/src/crawlee/crawlers/_basic/_basic_crawler.py @@ -40,6 +40,7 @@ ContextPipelineInterruptedError, HttpClientStatusCodeError, HttpStatusCodeError, + RequestCollisionError, RequestHandlerError, SessionError, UserDefinedErrorHandlerError, @@ -449,6 +450,20 @@ async def _get_session(self) -> Session | None: logger=self._logger, ) + async def _get_session_by_id(self, session_id: str | None) -> Session | None: + """If session pool is being used, try to take a session by id from it.""" + if not self._use_session_pool or not session_id: + return None + + return await wait_for( + partial(self._session_pool.get_session_by_id, session_id), + timeout=self._internal_timeout, + timeout_message='Fetching a session from the pool timed out after ' + f'{self._internal_timeout.total_seconds()} seconds', + max_retries=3, + logger=self._logger, + ) + async def _get_proxy_info(self, request: Request, session: Session | None) -> ProxyInfo | None: """Retrieve a new ProxyInfo object based on crawler configuration and the current request and session.""" if not self._proxy_configuration: @@ -1065,7 +1080,10 @@ async def __run_task_function(self) -> None: if request is None: return - session = await self._get_session() + if request.session_id: + session = await self._get_session_by_id(request.session_id) + else: + session = await self._get_session() proxy_info = await self._get_proxy_info(request, session) result = RequestHandlerRunResult(key_value_store_getter=self.get_key_value_store) @@ -1088,6 +1106,8 @@ async def __run_task_function(self) -> None: try: request.state = RequestState.REQUEST_HANDLER + self._check_request_collision(context.request, context.session) + try: await self._run_request_handler(context=context) except asyncio.TimeoutError as e: @@ -1110,6 +1130,10 @@ async def __run_task_function(self) -> None: self._statistics.record_request_processing_finish(statistics_id) + except RequestCollisionError as request_error: + context.request.no_retry = True + await self._handle_request_error(context, request_error) + except RequestHandlerError as primary_error: primary_error = cast( 'RequestHandlerError[TCrawlingContext]', primary_error @@ -1226,3 +1250,18 @@ def _raise_for_session_blocked_status_code(self, session: Session | None, status ignore_http_error_status_codes=self._ignore_http_error_status_codes, ): raise SessionError(f'Assuming the session is blocked based on HTTP status code {status_code}') + + def _check_request_collision(self, request: Request, session: Session | None) -> None: + """Raise an exception if a request cannot access required resources. + + Args: + request: The `Request` that might require specific resources (like a session). + session: The `Session` that was retrieved for the request, or `None` if not available. + + Raises: + RequestCollisionError: If the `Session` referenced by the `Request` is not available. + """ + if self._use_session_pool and request.session_id and not session: + raise RequestCollisionError( + f'The Session (id: {request.session_id}) bound to the Request is no longer available in SessionPool' + ) diff --git a/src/crawlee/errors.py b/src/crawlee/errors.py index 07f7848bdb..217ce31091 100644 --- a/src/crawlee/errors.py +++ b/src/crawlee/errors.py @@ -14,6 +14,7 @@ 'HttpClientStatusCodeError', 'HttpStatusCodeError', 'ProxyError', + 'RequestCollisionError', 'RequestHandlerError', 'ServiceConflictError', 'SessionError', @@ -106,3 +107,8 @@ def __init__(self, wrapped_exception: Exception, crawling_context: BasicCrawling @docs_group('Errors') class ContextPipelineInterruptedError(Exception): """May be thrown in the initialization phase of a middleware to signal that the request should not be processed.""" + + +@docs_group('Errors') +class RequestCollisionError(Exception): + """Raised when a request cannot be processed due to a conflict with required resources.""" diff --git a/tests/unit/_utils/test_requests.py b/tests/unit/_utils/test_requests.py index 4092b29e05..348ae95948 100644 --- a/tests/unit/_utils/test_requests.py +++ b/tests/unit/_utils/test_requests.py @@ -133,6 +133,7 @@ def test_compute_unique_key_complex() -> None: method='POST', headers=headers, payload=payload, + session_id='test_session', use_extended_unique_key=False, ) assert uk == url @@ -142,9 +143,10 @@ def test_compute_unique_key_complex() -> None: method='POST', headers=headers, payload=payload, + session_id='test_session', use_extended_unique_key=True, ) - assert extended_uk == 'POST|4e1a2cf6|9724c1e2|https://crawlee.dev' + assert extended_uk == 'POST|4e1a2cf6|9724c1e2|test_session|https://crawlee.dev' def test_compute_unique_key_post_with_none_payload() -> None: diff --git a/tests/unit/crawlers/_basic/test_basic_crawler.py b/tests/unit/crawlers/_basic/test_basic_crawler.py index 181bc4d429..11e4ecfc85 100644 --- a/tests/unit/crawlers/_basic/test_basic_crawler.py +++ b/tests/unit/crawlers/_basic/test_basic_crawler.py @@ -22,16 +22,16 @@ from crawlee._types import BasicCrawlingContext, EnqueueLinksKwargs, HttpHeaders from crawlee.configuration import Configuration from crawlee.crawlers import BasicCrawler -from crawlee.errors import SessionError, UserDefinedErrorHandlerError +from crawlee.errors import RequestCollisionError, SessionError, UserDefinedErrorHandlerError from crawlee.events._local_event_manager import LocalEventManager from crawlee.request_loaders import RequestList, RequestManagerTandem -from crawlee.sessions import SessionPool +from crawlee.sessions import Session, SessionPool from crawlee.statistics import FinalStatistics from crawlee.storage_clients import MemoryStorageClient from crawlee.storages import Dataset, KeyValueStore, RequestQueue if TYPE_CHECKING: - from collections.abc import Sequence + from collections.abc import Callable, Sequence import respx @@ -1217,3 +1217,84 @@ async def handler(context: BasicCrawlingContext) -> None: assert sessions[1] != sessions[0] else: assert sessions[1] == sessions[0] + + +async def test_bound_session_to_request() -> None: + async with SessionPool() as session_pool: + check_session: Session = await session_pool.get_session() + used_sessions = list[str]() + crawler = BasicCrawler(session_pool=session_pool) + + @crawler.router.default_handler + async def handler(context: BasicCrawlingContext) -> None: + if context.session: + used_sessions.append(context.session.id) + + requests = [ + Request.from_url('http://a.com/', session_id=check_session.id, always_enqueue=True) for _ in range(10) + ] + + await crawler.run(requests) + + assert len(used_sessions) == 10 + assert set(used_sessions) == {check_session.id} + + +async def test_bound_sessions_to_same_request() -> None: + # Use a custom function to avoid errors due to random Session retrieval + def create_session_function() -> Callable[[], Session]: + counter = -1 + + def create_session() -> Session: + nonlocal counter + counter += 1 + return Session(id=str(counter)) + + return create_session + + check_sessions = [str(session_id) for session_id in range(10)] + used_sessions = list[str]() + crawler = BasicCrawler(session_pool=SessionPool(create_session_function=create_session_function())) + + @crawler.router.default_handler + async def handler(context: BasicCrawlingContext) -> None: + if context.session: + used_sessions.append(context.session.id) + + requests = [ + Request.from_url('http://a.com/', session_id=str(session_id), use_extended_unique_key=True) + for session_id in range(10) + ] + + await crawler.run(requests) + + assert len(used_sessions) == 10 + assert set(used_sessions) == set(check_sessions) + + +async def test_error_bound_session_to_request() -> None: + crawler = BasicCrawler(request_handler=AsyncMock()) + + requests = [Request.from_url('http://a.com/', session_id='1', always_enqueue=True) for _ in range(10)] + + stats = await crawler.run(requests) + + assert stats.requests_total == 10 + assert stats.requests_failed == 10 + assert stats.retry_histogram == [10] + + +async def test_handle_error_bound_session_to_request() -> None: + error_handler_mock = AsyncMock() + crawler = BasicCrawler(request_handler=AsyncMock()) + + @crawler.failed_request_handler + async def error_req_hook(context: BasicCrawlingContext, error: Exception) -> None: + if isinstance(error, RequestCollisionError): + await error_handler_mock(context, error) + + requests = [Request.from_url('http://a.com/', session_id='1')] + + await crawler.run(requests) + + assert error_handler_mock.call_count == 1