Skip to content

Commit

Permalink
feat: Upgrade to Crawlee v0.5
Browse files Browse the repository at this point in the history
  • Loading branch information
vdusek committed Dec 17, 2024
1 parent fa7bb9d commit 18efd5d
Show file tree
Hide file tree
Showing 9 changed files with 200 additions and 115 deletions.
40 changes: 20 additions & 20 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,16 @@ keywords = [
python = "^3.9"
apify-client = ">=1.8.1"
apify-shared = ">=1.2.1"
crawlee = "~0.4.0"
crawlee = "==0.5.0b17"
cryptography = ">=42.0.0"
# TODO: relax the upper bound once the issue is resolved:
# https://github.com/apify/apify-sdk-python/issues/348
httpx = "~0.27.0"
lazy-object-proxy = ">=1.10.0"
scrapy = { version = ">=2.11.0", optional = true }
typing-extensions = ">=4.1.0"
# TODO: relax the upper bound once the issue is resolved:
# https://github.com/apify/apify-sdk-python/issues/325
websockets = ">=10.0 <14.0.0"

[tool.poetry.group.dev.dependencies]
Expand Down
53 changes: 31 additions & 22 deletions src/apify/_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
from apify_client import ApifyClientAsync
from apify_shared.consts import ActorEnvVars, ActorExitCodes, ApifyEnvVars
from apify_shared.utils import ignore_docs, maybe_extract_enum_member_value
from crawlee import service_container
from crawlee import service_locator
from crawlee.events._types import Event, EventPersistStateData
from crawlee.memory_storage_client import MemoryStorageClient

from apify._configuration import Configuration
from apify._consts import EVENT_LISTENERS_TIMEOUT
Expand Down Expand Up @@ -69,17 +70,31 @@ def __init__(
self._configure_logging = configure_logging
self._apify_client = self.new_client()

self._event_manager: EventManager
if self._configuration.is_at_home:
self._event_manager = PlatformEventManager(
# We need to keep both local & cloud storage clients because of the `force_cloud` option.
self._local_storage_client = MemoryStorageClient.from_config()
self._cloud_storage_client = ApifyStorageClient(configuration=self._configuration)

# Set the event manager based on whether the Actor is running on the platform or locally.
self._event_manager = (
PlatformEventManager(
config=self._configuration,
persist_state_interval=self._configuration.persist_state_interval,
)
else:
self._event_manager = LocalEventManager(
if self.is_at_home()
else LocalEventManager(
system_info_interval=self._configuration.system_info_interval,
persist_state_interval=self._configuration.persist_state_interval,
)
)

# Register services in the service locator.
if self.is_at_home():
service_locator.set_storage_client(self._cloud_storage_client)
else:
service_locator.set_storage_client(self._local_storage_client)

service_locator.set_event_manager(self.event_manager)
service_locator.set_configuration(self.configuration)

self._is_initialized = False

Expand All @@ -93,7 +108,7 @@ async def __aenter__(self) -> Self:
executing the block code, the `Actor.fail` method is called.
"""
if self._configure_logging:
_configure_logging(self._configuration)
_configure_logging()

await self.init()
return self
Expand Down Expand Up @@ -182,16 +197,6 @@ async def init(self) -> None:
if self._is_initialized:
raise RuntimeError('The Actor was already initialized!')

if self._configuration.token:
service_container.set_cloud_storage_client(ApifyStorageClient(configuration=self._configuration))

if self._configuration.is_at_home:
service_container.set_default_storage_client_type('cloud')
else:
service_container.set_default_storage_client_type('local')

service_container.set_event_manager(self._event_manager)

self._is_exiting = False
self._was_final_persist_state_emitted = False

Expand Down Expand Up @@ -243,7 +248,6 @@ async def finalize() -> None:
await self._event_manager.wait_for_all_listeners_to_complete(timeout=event_listeners_timeout)

await self._event_manager.__aexit__(None, None, None)
cast(dict, service_container._services).clear() # noqa: SLF001

await asyncio.wait_for(finalize(), cleanup_timeout.total_seconds())
self._is_initialized = False
Expand Down Expand Up @@ -347,11 +351,13 @@ async def open_dataset(
self._raise_if_not_initialized()
self._raise_if_cloud_requested_but_not_configured(force_cloud=force_cloud)

storage_client = self._cloud_storage_client if force_cloud else service_locator.get_storage_client()

return await Dataset.open(
id=id,
name=name,
configuration=self._configuration,
storage_client=service_container.get_storage_client(client_type='cloud' if force_cloud else None),
storage_client=storage_client,
)

async def open_key_value_store(
Expand Down Expand Up @@ -379,12 +385,13 @@ async def open_key_value_store(
"""
self._raise_if_not_initialized()
self._raise_if_cloud_requested_but_not_configured(force_cloud=force_cloud)
storage_client = self._cloud_storage_client if force_cloud else service_locator.get_storage_client()

return await KeyValueStore.open(
id=id,
name=name,
configuration=self._configuration,
storage_client=service_container.get_storage_client(client_type='cloud' if force_cloud else None),
storage_client=storage_client,
)

async def open_request_queue(
Expand Down Expand Up @@ -415,11 +422,13 @@ async def open_request_queue(
self._raise_if_not_initialized()
self._raise_if_cloud_requested_but_not_configured(force_cloud=force_cloud)

storage_client = self._cloud_storage_client if force_cloud else service_locator.get_storage_client()

return await RequestQueue.open(
id=id,
name=name,
configuration=self._configuration,
storage_client=service_container.get_storage_client(client_type='cloud' if force_cloud else None),
storage_client=storage_client,
)

async def push_data(self, data: dict | list[dict]) -> None:
Expand Down Expand Up @@ -941,7 +950,7 @@ async def create_proxy_configuration(
password: str | None = None,
groups: list[str] | None = None,
country_code: str | None = None,
proxy_urls: list[str] | None = None,
proxy_urls: list[str | None] | None = None,
new_url_function: _NewUrlFunction | None = None,
) -> ProxyConfiguration | None:
"""Create a ProxyConfiguration object with the passed proxy configuration.
Expand Down
12 changes: 10 additions & 2 deletions src/apify/_configuration.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

from datetime import datetime, timedelta
from logging import getLogger
from typing import Annotated, Any

from pydantic import AliasChoices, BeforeValidator, Field
Expand All @@ -12,6 +13,8 @@

from apify._utils import docs_group

logger = getLogger(__name__)


def _transform_to_list(value: Any) -> list[str] | None:
if value is None:
Expand Down Expand Up @@ -353,6 +356,11 @@ class Configuration(CrawleeConfiguration):
),
] = None

@classmethod
def get_global_configuration(cls) -> Configuration:
"""Retrieve the global instance of the configuration.
# Monkey-patch the base class so that it works with the extended configuration
CrawleeConfiguration.get_global_configuration = Configuration.get_global_configuration # type: ignore[method-assign]
Mostly for the backwards compatibility. It is recommended to use the `service_locator.get_configuration()`
instead.
"""
return cls()
6 changes: 3 additions & 3 deletions src/apify/_proxy_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@ def __init__(
password: str | None = None,
groups: list[str] | None = None,
country_code: str | None = None,
proxy_urls: list[str] | None = None,
proxy_urls: list[str | None] | None = None,
new_url_function: _NewUrlFunction | None = None,
tiered_proxy_urls: list[list[str]] | None = None,
tiered_proxy_urls: list[list[str | None]] | None = None,
_actor_config: Configuration | None = None,
_apify_client: ApifyClientAsync | None = None,
) -> None:
Expand Down Expand Up @@ -148,7 +148,7 @@ def __init__(
' "groups" or "country_code".'
)

if proxy_urls and any('apify.com' in url for url in proxy_urls):
if proxy_urls and any('apify.com' in (url or '') for url in proxy_urls):
logger.warning(
'Some Apify proxy features may work incorrectly. Please consider setting up Apify properties '
'instead of `proxy_urls`.\n'
Expand Down
12 changes: 4 additions & 8 deletions src/apify/log.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
from __future__ import annotations

import logging
from typing import TYPE_CHECKING

from apify_shared.utils import ignore_docs
from crawlee._log_config import CrawleeLogFormatter, configure_logger, get_configured_log_level

if TYPE_CHECKING:
from apify import Configuration

# Name of the logger used throughout the library (resolves to 'apify')
logger_name = __name__.split('.')[0]

Expand All @@ -21,11 +17,11 @@ class ActorLogFormatter(CrawleeLogFormatter): # noqa: D101 (Inherited from pare
pass


def _configure_logging(configuration: Configuration) -> None:
def _configure_logging() -> None:
apify_client_logger = logging.getLogger('apify_client')
configure_logger(apify_client_logger, configuration, remove_old_handlers=True)
configure_logger(apify_client_logger, remove_old_handlers=True)

level = get_configured_log_level(configuration)
level = get_configured_log_level()

# Keep apify_client logger quiet unless debug logging is requested
if level > logging.DEBUG:
Expand All @@ -42,4 +38,4 @@ def _configure_logging(configuration: Configuration) -> None:

# Use configured log level for apify logger
apify_logger = logging.getLogger('apify')
configure_logger(apify_logger, configuration, remove_old_handlers=True)
configure_logger(apify_logger, remove_old_handlers=True)
Loading

0 comments on commit 18efd5d

Please sign in to comment.