Skip to content

refactor: Introduce new Apify storage client #470

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions src/apify/apify_storage_client/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
from apify.apify_storage_client._apify_storage_client import ApifyStorageClient
from ._dataset_client import ApifyDatasetClient
from ._key_value_store_client import ApifyKeyValueStoreClient
from ._request_queue_client import ApifyRequestQueueClient
from ._storage_client import ApifyStorageClient

__all__ = ['ApifyStorageClient']
__all__ = [
'ApifyDatasetClient',
'ApifyKeyValueStoreClient',
'ApifyRequestQueueClient',
'ApifyStorageClient',
]
72 changes: 0 additions & 72 deletions src/apify/apify_storage_client/_apify_storage_client.py

This file was deleted.

237 changes: 115 additions & 122 deletions src/apify/apify_storage_client/_dataset_client.py
Original file line number Diff line number Diff line change
@@ -1,190 +1,183 @@
from __future__ import annotations

from typing import TYPE_CHECKING
import asyncio
from logging import getLogger
from typing import TYPE_CHECKING, Any

from typing_extensions import override

from crawlee.storage_clients._base import DatasetClient as BaseDatasetClient
from apify_client import ApifyClientAsync
from crawlee.storage_clients._base import DatasetClient
from crawlee.storage_clients.models import DatasetItemsListPage, DatasetMetadata

if TYPE_CHECKING:
from collections.abc import AsyncIterator
from contextlib import AbstractAsyncContextManager

from httpx import Response
from datetime import datetime

from apify_client.clients import DatasetClientAsync
from crawlee._types import JsonSerializable

from apify import Configuration

class DatasetClient(BaseDatasetClient):
"""Dataset resource client implementation based on the Apify platform storage."""
logger = getLogger(__name__)

def __init__(self, apify_dataset_client: DatasetClientAsync) -> None:
self._client = apify_dataset_client

@override
async def get(self) -> DatasetMetadata | None:
result = await self._client.get()
return DatasetMetadata.model_validate(result) if result else None
class ApifyDatasetClient(DatasetClient):
"""An Apify platform implementation of the dataset client."""

@override
async def update(
def __init__(
self,
*,
name: str | None = None,
) -> DatasetMetadata:
return DatasetMetadata.model_validate(
await self._client.update(
name=name,
)
id: str,
name: str | None,
created_at: datetime,
accessed_at: datetime,
modified_at: datetime,
item_count: int,
api_client: DatasetClientAsync,
) -> None:
"""Initialize a new instance.

Preferably use the `ApifyDatasetClient.open` class method to create a new instance.
"""
self._metadata = DatasetMetadata(
id=id,
name=name,
created_at=created_at,
accessed_at=accessed_at,
modified_at=modified_at,
item_count=item_count,
)

self._api_client = api_client
"""The Apify dataset client for API operations."""

self._lock = asyncio.Lock()
"""A lock to ensure that only one operation is performed at a time."""

@override
async def delete(self) -> None:
await self._client.delete()
@property
def metadata(self) -> DatasetMetadata:
return self._metadata

@override
async def list_items(
self,
@classmethod
async def open(
cls,
*,
offset: int | None = 0,
limit: int | None = BaseDatasetClient._LIST_ITEMS_LIMIT, # noqa: SLF001
clean: bool = False,
desc: bool = False,
fields: list[str] | None = None,
omit: list[str] | None = None,
unwind: str | None = None,
skip_empty: bool = False,
skip_hidden: bool = False,
flatten: list[str] | None = None,
view: str | None = None,
) -> DatasetItemsListPage:
return DatasetItemsListPage.model_validate(
vars(
await self._client.list_items(
offset=offset,
limit=limit,
clean=clean,
desc=desc,
fields=fields,
omit=omit,
unwind=unwind,
skip_empty=skip_empty,
skip_hidden=skip_hidden,
flatten=flatten,
view=view,
)
)
id: str | None,
name: str | None,
configuration: Configuration,
) -> ApifyDatasetClient:
token = configuration.token
api_url = configuration.api_base_url

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this should use configuration.default_dataset_id if both id and name are None.

# Otherwise, create a new one.
apify_client_async = ApifyClientAsync(
token=token,
api_url=api_url,
max_retries=8,
min_delay_between_retries_millis=500,
timeout_secs=360,
)

apify_datasets_client = apify_client_async.datasets()

metadata = DatasetMetadata.model_validate(
await apify_datasets_client.get_or_create(name=id if id is not None else name),
)

apify_dataset_client = apify_client_async.dataset(dataset_id=metadata.id)

return cls(
id=metadata.id,
name=metadata.name,
created_at=metadata.created_at,
accessed_at=metadata.accessed_at,
modified_at=metadata.modified_at,
item_count=metadata.item_count,
api_client=apify_dataset_client,
)

@override
async def iterate_items(
self,
*,
offset: int = 0,
limit: int | None = None,
clean: bool = False,
desc: bool = False,
fields: list[str] | None = None,
omit: list[str] | None = None,
unwind: str | None = None,
skip_empty: bool = False,
skip_hidden: bool = False,
) -> AsyncIterator[dict]:
async for item in self._client.iterate_items(
offset=offset,
limit=limit,
clean=clean,
desc=desc,
fields=fields,
omit=omit,
unwind=unwind,
skip_empty=skip_empty,
skip_hidden=skip_hidden,
):
yield item
async def purge(self) -> None:
# TODO: better
async with self._lock:
await self._api_client.delete()

@override
async def drop(self) -> None:
async with self._lock:
await self._api_client.delete()

@override
async def push_data(self, data: list[Any] | dict[str, Any]) -> None:
async with self._lock:
await self._api_client.push_items(items=data)
await self._update_metadata()

@override
async def get_items_as_bytes(
async def get_data(
self,
*,
item_format: str = 'json',
offset: int | None = None,
limit: int | None = None,
desc: bool = False,
offset: int = 0,
limit: int | None = 999_999_999_999,
clean: bool = False,
bom: bool = False,
delimiter: str | None = None,
desc: bool = False,
fields: list[str] | None = None,
omit: list[str] | None = None,
unwind: str | None = None,
skip_empty: bool = False,
skip_header_row: bool = False,
skip_hidden: bool = False,
xml_root: str | None = None,
xml_row: str | None = None,
flatten: list[str] | None = None,
) -> bytes:
return await self._client.get_items_as_bytes(
item_format=item_format,
view: str | None = None,
) -> DatasetItemsListPage:
response = await self._api_client.list_items(
offset=offset,
limit=limit,
desc=desc,
clean=clean,
bom=bom,
delimiter=delimiter,
desc=desc,
fields=fields,
omit=omit,
unwind=unwind,
skip_empty=skip_empty,
skip_header_row=skip_header_row,
skip_hidden=skip_hidden,
xml_root=xml_root,
xml_row=xml_row,
flatten=flatten,
view=view,
)
result = DatasetItemsListPage.model_validate(vars(response))
await self._update_metadata()
return result

@override
async def stream_items(
async def iterate_items(
self,
*,
item_format: str = 'json',
offset: int | None = None,
offset: int = 0,
limit: int | None = None,
desc: bool = False,
clean: bool = False,
bom: bool = False,
delimiter: str | None = None,
desc: bool = False,
fields: list[str] | None = None,
omit: list[str] | None = None,
unwind: str | None = None,
skip_empty: bool = False,
skip_header_row: bool = False,
skip_hidden: bool = False,
xml_root: str | None = None,
xml_row: str | None = None,
) -> AbstractAsyncContextManager[Response | None]:
return self._client.stream_items(
item_format=item_format,
) -> AsyncIterator[dict]:
async for item in self._api_client.iterate_items(
offset=offset,
limit=limit,
desc=desc,
clean=clean,
bom=bom,
delimiter=delimiter,
desc=desc,
fields=fields,
omit=omit,
unwind=unwind,
skip_empty=skip_empty,
skip_header_row=skip_header_row,
skip_hidden=skip_hidden,
xml_root=xml_root,
xml_row=xml_row,
)
):
yield item

@override
async def push_items(self, items: JsonSerializable) -> None:
await self._client.push_items(
items=items,
)
await self._update_metadata()

async def _update_metadata(self) -> None:
"""Update the dataset metadata file with current information."""
metadata = await self._api_client.get()
self._metadata = DatasetMetadata.model_validate(metadata)
Loading
Loading