From 37db8ff0f9490b0ed8c481b09ad59db5f5089d8b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ji=C5=99=C3=AD=20Morav=C4=8D=C3=ADk?= Date: Tue, 24 Jan 2023 08:30:50 +0100 Subject: [PATCH 1/4] feat: Add documentation for `Dataset`, `KeyValueStore`, and `RequestQueue` --- src/apify/storages/key_value_store.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/apify/storages/key_value_store.py b/src/apify/storages/key_value_store.py index 7780227d..202e64ad 100644 --- a/src/apify/storages/key_value_store.py +++ b/src/apify/storages/key_value_store.py @@ -123,12 +123,10 @@ async def iterate_keys(self, exclusive_start_key: Optional[str] = None) -> Async where `key` is the record key, and `info` is an object that contains a single property `size` indicating size of the record in bytes. """ - index = 0 while True: list_keys = await self._client.list_keys(exclusive_start_key=exclusive_start_key) for item in list_keys['items']: yield IterateKeysTuple(item['key'], {'size': item['size']}) - index += 1 if not list_keys['isTruncated']: break From 72b8883b422fdb4ab3d5e33f9f104d0620c5ae02 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ji=C5=99=C3=AD=20Morav=C4=8D=C3=ADk?= Date: Mon, 30 Jan 2023 04:35:42 +0100 Subject: [PATCH 2/4] feat: Various fixes and improvements --- src/apify/_utils.py | 47 +++++++++++++-- src/apify/actor.py | 2 +- src/apify/memory_storage/memory_storage.py | 7 ++- .../resource_clients/request_queue.py | 3 - src/apify/storages/dataset.py | 2 +- src/apify/storages/request_queue.py | 26 +++++++-- .../actor/test_actor_memory_storage_e2e.py | 58 +++++++++++++++++++ tests/unit/conftest.py | 8 +-- .../resource_clients/test_dataset.py | 14 +++-- .../test_dataset_collection.py | 3 - .../resource_clients/test_key_value_store.py | 14 +++-- .../test_key_value_store_collection.py | 4 -- .../resource_clients/test_request_queue.py | 20 +++++-- .../test_request_queue_collection.py | 4 -- .../memory_storage/test_memory_storage.py | 13 +++++ tests/unit/storages/test_request_queue.py | 15 +++++ tests/unit/test_utils.py | 17 ++++++ 17 files changed, 204 insertions(+), 53 deletions(-) create mode 100644 tests/unit/actor/test_actor_memory_storage_e2e.py diff --git a/src/apify/_utils.py b/src/apify/_utils.py index 69bec36f..7f5d89d0 100644 --- a/src/apify/_utils.py +++ b/src/apify/_utils.py @@ -19,7 +19,7 @@ from enum import Enum from typing import Any, Callable, Dict, Generic, ItemsView, Iterator, NoReturn, Optional from typing import OrderedDict as OrderedDictType -from typing import Type, TypeVar, Union, ValuesView, cast, overload +from typing import Tuple, Type, TypeVar, Union, ValuesView, cast, overload import aioshutil import psutil @@ -285,12 +285,12 @@ def _is_file_or_bytes(value: Any) -> bool: return isinstance(value, (bytes, bytearray, io.IOBase)) -def _maybe_parse_body(body: bytes, content_type: str) -> Any: # TODO: Improve return type +def _maybe_parse_body(body: bytes, content_type: str) -> Any: try: if _is_content_type_json(content_type): return json.loads(body) # Returns any elif _is_content_type_xml(content_type) or _is_content_type_text(content_type): - return body.decode('utf-8') # TODO: Check if utf-8 can be assumed + return body.decode('utf-8') except ValueError as err: print('_maybe_parse_body error', err) return body @@ -361,12 +361,11 @@ def __setitem__(self, key: str, value: T) -> None: def __delitem__(self, key: str) -> None: """Remove an item from the cache.""" - # TODO: maybe do? self._cache.__delitem__(key) del self._cache[key] - def __iter__(self) -> Iterator: + def __iter__(self) -> Iterator[str]: """Iterate over the keys of the cache in order of insertion.""" - yield from self._cache.__iter__() + return self._cache.__iter__() def __len__(self) -> int: """Get the number of items in the cache.""" @@ -383,3 +382,39 @@ def items(self) -> ItemsView[str, T]: # Needed so we don't mutate the cache by def _is_running_in_ipython() -> bool: return getattr(builtins, '__IPYTHON__', False) + + +@overload +def _budget_ow(value: Union[str, int, float, bool], predicate: Tuple[Type, bool], value_name: str) -> None: # noqa: U100 + ... + + +@overload +def _budget_ow(value: Dict, predicate: Dict[str, Tuple[Type, bool]]) -> None: # noqa: U100 + ... + + +def _budget_ow( + value: Union[Dict, str, int, float, bool], + predicate: Union[Dict[str, Tuple[Type, bool]], Tuple[Type, bool]], + value_name: Optional[str] = None, +) -> None: + """Budget version of ow.""" + def validate_single(field_value: Any, expected_type: Type, required: bool, name: str) -> None: + if field_value is None and required: + raise ValueError(f'"{name}" is required!') + actual_type = type(field_value) + if (field_value is not None or required) and actual_type != expected_type: + raise ValueError(f'"{name}" must be of type "{expected_type.__name__}" but it is "{actual_type.__name__}"!') + + # Validate object + if isinstance(value, dict) and isinstance(predicate, dict): + for key, (field_type, required) in predicate.items(): + field_value = value.get(key) + validate_single(field_value, field_type, required, key) + # Validate "primitive" + elif isinstance(value, (int, str, float, bool)) and isinstance(predicate, tuple) and value_name is not None: + field_type, required = predicate + validate_single(value, field_type, required, value_name) + else: + raise ValueError('Wrong input!') diff --git a/src/apify/actor.py b/src/apify/actor.py index 7f8f1879..8cc3432c 100644 --- a/src/apify/actor.py +++ b/src/apify/actor.py @@ -570,7 +570,7 @@ async def _push_data_internal(self, data: Any) -> None: if not data: return - if not isinstance(data, list): # TODO: Memory storage does this on its own... + if not isinstance(data, list): data = [data] dataset = await self.open_dataset() diff --git a/src/apify/memory_storage/memory_storage.py b/src/apify/memory_storage/memory_storage.py index e89f0676..7c161f78 100644 --- a/src/apify/memory_storage/memory_storage.py +++ b/src/apify/memory_storage/memory_storage.py @@ -6,6 +6,7 @@ from aiofiles import ospath from aiofiles.os import rename, scandir +from ..consts import ApifyEnvVars from .resource_clients.dataset import DatasetClient from .resource_clients.dataset_collection import DatasetCollectionClient from .resource_clients.key_value_store import KeyValueStoreClient @@ -38,7 +39,7 @@ class MemoryStorage: """Indicates whether a purge was already performed on this instance""" def __init__( - self, *, local_data_directory: str = './storage', write_metadata: Optional[bool] = None, persist_storage: Optional[bool] = None, + self, *, local_data_directory: Optional[str] = None, write_metadata: Optional[bool] = None, persist_storage: Optional[bool] = None, ) -> None: """Initialize the MemoryStorage. @@ -47,13 +48,13 @@ def __init__( persist_storage (bool, optional): Whether to persist the data to the `local_data_directory` or just keep them in memory write_metadata (bool, optional): Whether to persist metadata of the storages as well """ - self._local_data_directory = local_data_directory # TODO: Make this work with `APIFY_LOCAL_STORAGE_DIR` + self._local_data_directory = local_data_directory or os.getenv(ApifyEnvVars.LOCAL_STORAGE_DIR.value) or './storage' self._datasets_directory = os.path.join(self._local_data_directory, 'datasets') self._key_value_stores_directory = os.path.join(self._local_data_directory, 'key_value_stores') self._request_queues_directory = os.path.join(self._local_data_directory, 'request_queues') self._write_metadata = write_metadata if write_metadata is not None else '*' in os.getenv('DEBUG', '') self._persist_storage = persist_storage if persist_storage is not None else not any( - os.getenv('APIFY_PERSIST_STORAGE', 'true') == s for s in ['false', '0', '']) + os.getenv(ApifyEnvVars.PERSIST_STORAGE.value, 'true') == s for s in ['false', '0', '']) self._datasets_handled = [] self._key_value_stores_handled = [] self._request_queues_handled = [] diff --git a/src/apify/memory_storage/resource_clients/request_queue.py b/src/apify/memory_storage/resource_clients/request_queue.py index 37a0ee2c..4a860232 100644 --- a/src/apify/memory_storage/resource_clients/request_queue.py +++ b/src/apify/memory_storage/resource_clients/request_queue.py @@ -154,7 +154,6 @@ async def add_request(self, request: Dict, *, forefront: Optional[bool] = None) Returns: dict: The added request. """ - # TODO: Throw if uniqueKey or url missing from request dict, also do for update_request... existing_queue_by_id = _find_or_cache_request_queue_by_possible_id(self._client, self._name or self._id) if existing_queue_by_id is None: @@ -175,7 +174,6 @@ async def add_request(self, request: Dict, *, forefront: Optional[bool] = None) } existing_queue_by_id._requests[request_model['id']] = request_model - # TODO: Validate the next line logic, seems wrong in crawlee existing_queue_by_id._pending_request_count += 0 if request_model['orderNo'] is None else 1 await existing_queue_by_id._update_timestamps(True) await _update_request_queue_item( @@ -248,7 +246,6 @@ async def update_request(self, request: Dict, *, forefront: Optional[bool] = Non request_was_handled_before_update = existing_request['orderNo'] is None # We add 1 pending request if previous state was handled - # TODO: Validate the next 2 lines logic, seems wrong in crawlee if is_request_handled_state_changing: pending_count_adjustment = 1 if request_was_handled_before_update else -1 diff --git a/src/apify/storages/dataset.py b/src/apify/storages/dataset.py index 78a8a605..a6f3d966 100644 --- a/src/apify/storages/dataset.py +++ b/src/apify/storages/dataset.py @@ -242,7 +242,7 @@ async def export_to( if content_type == 'text/csv': output = io.StringIO() - writer = csv.writer(output, quoting=csv.QUOTE_MINIMAL) # TODO: Compare quoting behavior with TS impl + writer = csv.writer(output, quoting=csv.QUOTE_MINIMAL) writer.writerows([items[0].keys(), *[item.values() for item in items]]) value = output.getvalue() return await key_value_store.set_value(key, value, content_type) diff --git a/src/apify/storages/request_queue.py b/src/apify/storages/request_queue.py index 1e8ea51f..64b3e2c7 100644 --- a/src/apify/storages/request_queue.py +++ b/src/apify/storages/request_queue.py @@ -10,7 +10,7 @@ from apify_client import ApifyClientAsync from apify_client.clients import RequestQueueClientAsync -from .._utils import LRUCache, _crypto_random_object_id, _unique_key_to_request_id +from .._utils import LRUCache, _budget_ow, _crypto_random_object_id, _unique_key_to_request_id from ..config import Configuration from ..consts import REQUEST_QUEUE_HEAD_MAX_LIMIT from ..memory_storage import MemoryStorage @@ -126,7 +126,7 @@ async def _create_instance(cls, request_queue_id_or_name: str, client: Union[Api def _get_default_name(cls, config: Configuration) -> str: return config.default_request_queue_id - async def add_request(self, request: Dict, *, forefront: bool = False) -> Dict: # TODO: Validate request with pydantic + async def add_request(self, request: Dict, *, forefront: bool = False) -> Dict: """Add a request to the queue. Args: @@ -136,8 +136,14 @@ async def add_request(self, request: Dict, *, forefront: bool = False) -> Dict: Returns: dict: Information about the queue operation with keys `requestId`, `uniqueKey`, `wasAlreadyPresent`, `wasAlreadyHandled`. """ + _budget_ow(request, { + 'url': (str, True), + }) self._last_activity = datetime.utcnow() + if request.get('uniqueKey') is None: + request['uniqueKey'] = request['url'] # TODO: Check Request class in crawlee and replicate uniqueKey generation logic... + cache_key = _unique_key_to_request_id(request['uniqueKey']) cached_info = self._requests_cache.get(cache_key) @@ -174,7 +180,8 @@ async def get_request(self, request_id: str) -> Optional[Dict]: Returns: dict, optional: The retrieved request, or `None`, if it does not exist. """ - return await self._client.get_request(request_id) # TODO: Maybe create a Request class? + _budget_ow(request_id, (str, True), 'request_id') + return await self._client.get_request(request_id) # TODO: Maybe create a Request dataclass? async def fetch_next_request(self) -> Optional[Dict]: """Return the next request in the queue to be processed. @@ -241,7 +248,7 @@ async def fetch_next_request(self) -> Optional[Dict]: return request - async def mark_request_as_handled(self, request: Dict) -> Optional[Dict]: # TODO: Validate request with pydantic + async def mark_request_as_handled(self, request: Dict) -> Optional[Dict]: """Mark a request as handled after successful processing. Handled requests will never again be returned by the `RequestQueue.fetch_next_request` method. @@ -253,6 +260,11 @@ async def mark_request_as_handled(self, request: Dict) -> Optional[Dict]: # TOD dict, optional: Information about the queue operation with keys `requestId`, `uniqueKey`, `wasAlreadyPresent`, `wasAlreadyHandled`. `None` if the given request was not in progress. """ + _budget_ow(request, { + 'id': (str, True), + 'uniqueKey': (str, True), + 'handledAt': (datetime, False), + }) self._last_activity = datetime.utcnow() if request['id'] not in self._in_progress: logging.debug(f'Cannot mark request {request["id"]} as handled, because it is not in progress!') @@ -272,7 +284,7 @@ async def mark_request_as_handled(self, request: Dict) -> Optional[Dict]: # TOD return queue_operation_info - async def reclaim_request(self, request: Dict, forefront: bool = False) -> Optional[Dict]: # TODO: Validate request with pydantic + async def reclaim_request(self, request: Dict, forefront: bool = False) -> Optional[Dict]: """Reclaim a failed request back to the queue. The request will be returned for processing later again @@ -285,6 +297,10 @@ async def reclaim_request(self, request: Dict, forefront: bool = False) -> Optio dict, optional: Information about the queue operation with keys `requestId`, `uniqueKey`, `wasAlreadyPresent`, `wasAlreadyHandled`. `None` if the given request was not in progress. """ + _budget_ow(request, { + 'id': (str, True), + 'uniqueKey': (str, True), + }) self._last_activity = datetime.utcnow() if request['id'] not in self._in_progress: diff --git a/tests/unit/actor/test_actor_memory_storage_e2e.py b/tests/unit/actor/test_actor_memory_storage_e2e.py new file mode 100644 index 00000000..6429d282 --- /dev/null +++ b/tests/unit/actor/test_actor_memory_storage_e2e.py @@ -0,0 +1,58 @@ +import pytest + +from apify import Actor +from apify.config import Configuration +from apify.consts import ApifyEnvVars +from apify.memory_storage import MemoryStorage +from apify.storage_client_manager import StorageClientManager +from apify.storages import StorageManager + + +async def run_e2e_test(monkeypatch: pytest.MonkeyPatch, tmp_path: str, purge_on_start: bool = True) -> None: + # Configure purging env var + monkeypatch.setenv(ApifyEnvVars.PURGE_ON_START.value, 'true' if purge_on_start else 'false') + # Store old storage client so we have the object reference for comparison + old_client = StorageClientManager.get_storage_client() + async with Actor: + old_default_kvs = await Actor.open_key_value_store() + old_non_default_kvs = await Actor.open_key_value_store('non-default') + # Create data in default and non-default key-value store + await old_default_kvs.set_value('test', 'default value') + await old_non_default_kvs.set_value('test', 'non-default value') + + # Clean up singletons and mock a new memory storage + monkeypatch.setattr(Actor, '_default_instance', None) + monkeypatch.setattr(Configuration, '_default_instance', None) + monkeypatch.setattr(StorageManager, '_default_instance', None) + monkeypatch.setattr(StorageClientManager, '_default_instance', None) + + new_patched_memory_storage = MemoryStorage(local_data_directory=tmp_path) + + def get_storage_client() -> 'MemoryStorage': + return new_patched_memory_storage + monkeypatch.setattr(StorageClientManager, 'get_storage_client', get_storage_client) + + # We simulate another clean run, we expect the memory storage to read from the local data directory + # Default storages are purged based on purge_on_start parameter. + async with Actor: + # Check if we're using a different memory storage instance + assert old_client is not StorageClientManager.get_storage_client() + default_kvs = await Actor.open_key_value_store() + assert default_kvs is not old_default_kvs + non_default_kvs = await Actor.open_key_value_store('non-default') + assert non_default_kvs is not old_non_default_kvs + default_value = await default_kvs.get_value('test') + non_default_value = await non_default_kvs.get_value('test') + if purge_on_start: + assert default_value is None + else: + assert default_value == 'default value' + assert non_default_value == 'non-default value' + + +async def test_actor_memory_storage_e2e(monkeypatch: pytest.MonkeyPatch, tmp_path: str) -> None: + """This test simulates two clean runs using memory storage. + The second run attempts to access data created by the first one. + We run 2 configurations with different `purge_on_start`.""" + await run_e2e_test(monkeypatch, tmp_path, purge_on_start=True) + await run_e2e_test(monkeypatch, tmp_path, purge_on_start=False) diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index 26ec12aa..019bb778 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -1,7 +1,7 @@ import asyncio import inspect from collections import defaultdict -from typing import Any, AsyncIterator, Callable, Dict, List, Optional, Tuple, get_type_hints +from typing import Any, Callable, Dict, List, Optional, Tuple, get_type_hints import pytest @@ -121,7 +121,5 @@ def apify_client_async_patcher(monkeypatch: pytest.MonkeyPatch) -> ApifyClientAs @pytest.fixture() -async def memory_storage(tmp_path: str) -> AsyncIterator[MemoryStorage]: - ms = MemoryStorage(local_data_directory=tmp_path, write_metadata=True) # TODO: Remove write_metadata=True as it's not the default setting... - yield ms - await ms.purge() # TODO: Do we want this here? there are unit tests specifically for purge +def memory_storage(tmp_path: str) -> MemoryStorage: + return MemoryStorage(local_data_directory=tmp_path) diff --git a/tests/unit/memory_storage/resource_clients/test_dataset.py b/tests/unit/memory_storage/resource_clients/test_dataset.py index 65181576..dcf7e7a7 100644 --- a/tests/unit/memory_storage/resource_clients/test_dataset.py +++ b/tests/unit/memory_storage/resource_clients/test_dataset.py @@ -38,15 +38,16 @@ async def test_get(dataset_client: DatasetClient) -> None: async def test_update(dataset_client: DatasetClient) -> None: new_dataset_name = 'test-update' + await dataset_client.push_items({'abc': 123}) old_dataset_info = await dataset_client.get() assert old_dataset_info is not None old_dataset_directory = os.path.join(dataset_client._client._datasets_directory, old_dataset_info['name']) new_dataset_directory = os.path.join(dataset_client._client._datasets_directory, new_dataset_name) - assert os.path.exists(os.path.join(old_dataset_directory, '__metadata__.json')) is True - assert os.path.exists(os.path.join(new_dataset_directory, '__metadata__.json')) is False + assert os.path.exists(os.path.join(old_dataset_directory, '000000001.json')) is True + assert os.path.exists(os.path.join(new_dataset_directory, '000000001.json')) is False updated_dataset_info = await dataset_client.update(name=new_dataset_name) - assert os.path.exists(os.path.join(old_dataset_directory, '__metadata__.json')) is False - assert os.path.exists(os.path.join(new_dataset_directory, '__metadata__.json')) is True + assert os.path.exists(os.path.join(old_dataset_directory, '000000001.json')) is False + assert os.path.exists(os.path.join(new_dataset_directory, '000000001.json')) is True # Only modifiedAt and accessedAt should be different assert old_dataset_info['createdAt'] == updated_dataset_info['createdAt'] assert old_dataset_info['modifiedAt'] != updated_dataset_info['modifiedAt'] @@ -57,12 +58,13 @@ async def test_update(dataset_client: DatasetClient) -> None: async def test_delete(dataset_client: DatasetClient) -> None: + await dataset_client.push_items({'abc': 123}) dataset_info = await dataset_client.get() assert dataset_info is not None dataset_directory = os.path.join(dataset_client._client._datasets_directory, dataset_info['name']) - assert os.path.exists(os.path.join(dataset_directory, '__metadata__.json')) is True + assert os.path.exists(os.path.join(dataset_directory, '000000001.json')) is True await dataset_client.delete() - assert os.path.exists(os.path.join(dataset_directory, '__metadata__.json')) is False + assert os.path.exists(os.path.join(dataset_directory, '000000001.json')) is False # Does not crash when called again await dataset_client.delete() diff --git a/tests/unit/memory_storage/resource_clients/test_dataset_collection.py b/tests/unit/memory_storage/resource_clients/test_dataset_collection.py index 08def367..748a1db7 100644 --- a/tests/unit/memory_storage/resource_clients/test_dataset_collection.py +++ b/tests/unit/memory_storage/resource_clients/test_dataset_collection.py @@ -1,4 +1,3 @@ -import os import pytest @@ -14,10 +13,8 @@ def datasets_client(memory_storage: MemoryStorage) -> DatasetCollectionClient: async def test_get_or_create(datasets_client: DatasetCollectionClient) -> None: dataset_name = 'test' # A new dataset gets created - assert os.path.exists(os.path.join(datasets_client._datasets_directory, dataset_name, '__metadata__.json')) is False dataset_info = await datasets_client.get_or_create(name=dataset_name) assert dataset_info['name'] == dataset_name - assert os.path.exists(os.path.join(datasets_client._datasets_directory, dataset_name, '__metadata__.json')) is True # Another get_or_create call returns the same dataset dataset_info_existing = await datasets_client.get_or_create(name=dataset_name) assert dataset_info['id'] == dataset_info_existing['id'] diff --git a/tests/unit/memory_storage/resource_clients/test_key_value_store.py b/tests/unit/memory_storage/resource_clients/test_key_value_store.py index 48c16829..10d20f5d 100644 --- a/tests/unit/memory_storage/resource_clients/test_key_value_store.py +++ b/tests/unit/memory_storage/resource_clients/test_key_value_store.py @@ -40,15 +40,16 @@ async def test_get(key_value_store_client: KeyValueStoreClient) -> None: async def test_update(key_value_store_client: KeyValueStoreClient) -> None: new_kvs_name = 'test-update' + await key_value_store_client.set_record('test', {'abc': 123}) old_kvs_info = await key_value_store_client.get() assert old_kvs_info is not None old_kvs_directory = os.path.join(key_value_store_client._client._key_value_stores_directory, old_kvs_info['name']) new_kvs_directory = os.path.join(key_value_store_client._client._key_value_stores_directory, new_kvs_name) - assert os.path.exists(os.path.join(old_kvs_directory, '__metadata__.json')) is True - assert os.path.exists(os.path.join(new_kvs_directory, '__metadata__.json')) is False + assert os.path.exists(os.path.join(old_kvs_directory, 'test.json')) is True + assert os.path.exists(os.path.join(new_kvs_directory, 'test.json')) is False updated_kvs_info = await key_value_store_client.update(name=new_kvs_name) - assert os.path.exists(os.path.join(old_kvs_directory, '__metadata__.json')) is False - assert os.path.exists(os.path.join(new_kvs_directory, '__metadata__.json')) is True + assert os.path.exists(os.path.join(old_kvs_directory, 'test.json')) is False + assert os.path.exists(os.path.join(new_kvs_directory, 'test.json')) is True # Only modifiedAt and accessedAt should be different assert old_kvs_info['createdAt'] == updated_kvs_info['createdAt'] assert old_kvs_info['modifiedAt'] != updated_kvs_info['modifiedAt'] @@ -59,12 +60,13 @@ async def test_update(key_value_store_client: KeyValueStoreClient) -> None: async def test_delete(key_value_store_client: KeyValueStoreClient) -> None: + await key_value_store_client.set_record('test', {'abc': 123}) kvs_info = await key_value_store_client.get() assert kvs_info is not None kvs_directory = os.path.join(key_value_store_client._client._key_value_stores_directory, kvs_info['name']) - assert os.path.exists(os.path.join(kvs_directory, '__metadata__.json')) is True + assert os.path.exists(os.path.join(kvs_directory, 'test.json')) is True await key_value_store_client.delete() - assert os.path.exists(os.path.join(kvs_directory, '__metadata__.json')) is False + assert os.path.exists(os.path.join(kvs_directory, 'test.json')) is False # Does not crash when called again await key_value_store_client.delete() diff --git a/tests/unit/memory_storage/resource_clients/test_key_value_store_collection.py b/tests/unit/memory_storage/resource_clients/test_key_value_store_collection.py index 18b99492..e32474e4 100644 --- a/tests/unit/memory_storage/resource_clients/test_key_value_store_collection.py +++ b/tests/unit/memory_storage/resource_clients/test_key_value_store_collection.py @@ -1,5 +1,3 @@ -import os - import pytest from apify.memory_storage import MemoryStorage @@ -14,10 +12,8 @@ def key_value_stores_client(memory_storage: MemoryStorage) -> KeyValueStoreColle async def test_get_or_create(key_value_stores_client: KeyValueStoreCollectionClient) -> None: kvs_name = 'test' # A new kvs gets created - assert os.path.exists(os.path.join(key_value_stores_client._key_value_stores_directory, kvs_name, '__metadata__.json')) is False kvs_info = await key_value_stores_client.get_or_create(name=kvs_name) assert kvs_info['name'] == kvs_name - assert os.path.exists(os.path.join(key_value_stores_client._key_value_stores_directory, kvs_name, '__metadata__.json')) is True # Another get_or_create call returns the same kvs kvs_info_existing = await key_value_stores_client.get_or_create(name=kvs_name) assert kvs_info['id'] == kvs_info_existing['id'] diff --git a/tests/unit/memory_storage/resource_clients/test_request_queue.py b/tests/unit/memory_storage/resource_clients/test_request_queue.py index b0b75781..68c2e7da 100644 --- a/tests/unit/memory_storage/resource_clients/test_request_queue.py +++ b/tests/unit/memory_storage/resource_clients/test_request_queue.py @@ -31,15 +31,19 @@ async def test_get(request_queue_client: RequestQueueClient) -> None: async def test_update(request_queue_client: RequestQueueClient) -> None: new_rq_name = 'test-update' + await request_queue_client.add_request({ + 'uniqueKey': 'https://apify.com', + 'url': 'https://apify.com', + }) old_rq_info = await request_queue_client.get() assert old_rq_info is not None old_rq_directory = os.path.join(request_queue_client._client._request_queues_directory, old_rq_info['name']) new_rq_directory = os.path.join(request_queue_client._client._request_queues_directory, new_rq_name) - assert os.path.exists(os.path.join(old_rq_directory, '__metadata__.json')) is True - assert os.path.exists(os.path.join(new_rq_directory, '__metadata__.json')) is False + assert os.path.exists(os.path.join(old_rq_directory, 'fvwscO2UJLdr10B.json')) is True + assert os.path.exists(os.path.join(new_rq_directory, 'fvwscO2UJLdr10B.json')) is False updated_rq_info = await request_queue_client.update(name=new_rq_name) - assert os.path.exists(os.path.join(old_rq_directory, '__metadata__.json')) is False - assert os.path.exists(os.path.join(new_rq_directory, '__metadata__.json')) is True + assert os.path.exists(os.path.join(old_rq_directory, 'fvwscO2UJLdr10B.json')) is False + assert os.path.exists(os.path.join(new_rq_directory, 'fvwscO2UJLdr10B.json')) is True # Only modifiedAt and accessedAt should be different assert old_rq_info['createdAt'] == updated_rq_info['createdAt'] assert old_rq_info['modifiedAt'] != updated_rq_info['modifiedAt'] @@ -50,12 +54,16 @@ async def test_update(request_queue_client: RequestQueueClient) -> None: async def test_delete(request_queue_client: RequestQueueClient) -> None: + await request_queue_client.add_request({ + 'uniqueKey': 'https://apify.com', + 'url': 'https://apify.com', + }) rq_info = await request_queue_client.get() assert rq_info is not None rq_directory = os.path.join(request_queue_client._client._request_queues_directory, rq_info['name']) - assert os.path.exists(os.path.join(rq_directory, '__metadata__.json')) is True + assert os.path.exists(os.path.join(rq_directory, 'fvwscO2UJLdr10B.json')) is True await request_queue_client.delete() - assert os.path.exists(os.path.join(rq_directory, '__metadata__.json')) is False + assert os.path.exists(os.path.join(rq_directory, 'fvwscO2UJLdr10B.json')) is False # Does not crash when called again await request_queue_client.delete() diff --git a/tests/unit/memory_storage/resource_clients/test_request_queue_collection.py b/tests/unit/memory_storage/resource_clients/test_request_queue_collection.py index c072e163..19bcf1d8 100644 --- a/tests/unit/memory_storage/resource_clients/test_request_queue_collection.py +++ b/tests/unit/memory_storage/resource_clients/test_request_queue_collection.py @@ -1,5 +1,3 @@ -import os - import pytest from apify.memory_storage import MemoryStorage @@ -14,10 +12,8 @@ def request_queues_client(memory_storage: MemoryStorage) -> RequestQueueCollecti async def test_get_or_create(request_queues_client: RequestQueueCollectionClient) -> None: rq_name = 'test' # A new request queue gets created - assert os.path.exists(os.path.join(request_queues_client._request_queues_directory, rq_name, '__metadata__.json')) is False rq_info = await request_queues_client.get_or_create(name=rq_name) assert rq_info['name'] == rq_name - assert os.path.exists(os.path.join(request_queues_client._request_queues_directory, rq_name, '__metadata__.json')) is True # Another get_or_create call returns the same request queue rq_existing = await request_queues_client.get_or_create(name=rq_name) assert rq_info['id'] == rq_existing['id'] diff --git a/tests/unit/memory_storage/test_memory_storage.py b/tests/unit/memory_storage/test_memory_storage.py index 44470e54..d2956638 100644 --- a/tests/unit/memory_storage/test_memory_storage.py +++ b/tests/unit/memory_storage/test_memory_storage.py @@ -2,6 +2,7 @@ import pytest +from apify.consts import ApifyEnvVars from apify.memory_storage import MemoryStorage @@ -128,3 +129,15 @@ async def test_not_implemented_method(tmp_path: str) -> None: with pytest.raises(NotImplementedError): await ddt.stream_items(item_format='json') + + +async def test_storage_path_configuration(monkeypatch: pytest.MonkeyPatch) -> None: + default_ms = MemoryStorage() + assert default_ms._local_data_directory == './storage' + # We expect the env var to override the default value + monkeypatch.setenv(ApifyEnvVars.LOCAL_STORAGE_DIR, './env_var_storage_dir') + env_var_ms = MemoryStorage() + assert env_var_ms._local_data_directory == './env_var_storage_dir' + # We expect the parametrized value to override the env var + parametrized_ms = MemoryStorage(local_data_directory='./parametrized_storage_dir') + assert parametrized_ms._local_data_directory == './parametrized_storage_dir' diff --git a/tests/unit/storages/test_request_queue.py b/tests/unit/storages/test_request_queue.py index 174b1724..b59185af 100644 --- a/tests/unit/storages/test_request_queue.py +++ b/tests/unit/storages/test_request_queue.py @@ -1,4 +1,5 @@ import asyncio +from datetime import datetime import pytest @@ -27,9 +28,22 @@ async def test_drop() -> None: assert rq1 is not rq2 +async def test_get_request(request_queue: RequestQueue) -> None: + url = 'https://example.com' + add_request_info = await request_queue.add_request({ + 'uniqueKey': url, + 'url': url, + }) + request = await request_queue.get_request(add_request_info['requestId']) + assert request is not None + assert request['url'] == url + + async def test_add_fetch_handle_request(request_queue: RequestQueue) -> None: url = 'https://example.com' assert await request_queue.is_empty() is True + with pytest.raises(ValueError, match='"url" is required'): + await request_queue.add_request({}) add_request_info = await request_queue.add_request({ 'uniqueKey': url, 'url': url, @@ -41,6 +55,7 @@ async def test_add_fetch_handle_request(request_queue: RequestQueue) -> None: next = await request_queue.fetch_next_request() assert next is not None # Mark it as handled + next['handledAt'] = datetime.utcnow() queue_operation_info = await request_queue.mark_request_as_handled(next) assert queue_operation_info is not None assert queue_operation_info['uniqueKey'] == url diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py index 116cf525..e266571e 100644 --- a/tests/unit/test_utils.py +++ b/tests/unit/test_utils.py @@ -9,6 +9,7 @@ from aiofiles.os import mkdir from apify._utils import ( + _budget_ow, _crypto_random_object_id, _fetch_and_parse_env_var, _filter_out_none_values_recursively, @@ -323,3 +324,19 @@ def test__crypto_random_object_id() -> None: long_random_object_id = _crypto_random_object_id(1000) for char in long_random_object_id: assert char in 'abcdefghijklmnopqrstuvwxyzABCEDFGHIJKLMNOPQRSTUVWXYZ0123456789' + + +def test__budget_ow() -> None: + _budget_ow({ + 'a': 123, + 'b': 'string', + 'c': datetime.datetime.utcnow(), + }, { + 'a': (int, True), + 'b': (str, False), + 'c': (datetime.datetime, True), + }) + with pytest.raises(ValueError, match='required'): + _budget_ow({}, {'id': (str, True)}) + with pytest.raises(ValueError, match='must be of type'): + _budget_ow({'id': 123}, {'id': (str, True)}) From 8fbfccbd5c826005b238bb78d4602eeed6982ea8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ji=C5=99=C3=AD=20Morav=C4=8D=C3=ADk?= Date: Mon, 30 Jan 2023 20:03:30 +0100 Subject: [PATCH 3/4] simplify env var name gettings --- src/apify/memory_storage/memory_storage.py | 4 ++-- tests/unit/actor/test_actor_memory_storage_e2e.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/apify/memory_storage/memory_storage.py b/src/apify/memory_storage/memory_storage.py index 7c161f78..793feaad 100644 --- a/src/apify/memory_storage/memory_storage.py +++ b/src/apify/memory_storage/memory_storage.py @@ -48,13 +48,13 @@ def __init__( persist_storage (bool, optional): Whether to persist the data to the `local_data_directory` or just keep them in memory write_metadata (bool, optional): Whether to persist metadata of the storages as well """ - self._local_data_directory = local_data_directory or os.getenv(ApifyEnvVars.LOCAL_STORAGE_DIR.value) or './storage' + self._local_data_directory = local_data_directory or os.getenv(ApifyEnvVars.LOCAL_STORAGE_DIR) or './storage' self._datasets_directory = os.path.join(self._local_data_directory, 'datasets') self._key_value_stores_directory = os.path.join(self._local_data_directory, 'key_value_stores') self._request_queues_directory = os.path.join(self._local_data_directory, 'request_queues') self._write_metadata = write_metadata if write_metadata is not None else '*' in os.getenv('DEBUG', '') self._persist_storage = persist_storage if persist_storage is not None else not any( - os.getenv(ApifyEnvVars.PERSIST_STORAGE.value, 'true') == s for s in ['false', '0', '']) + os.getenv(ApifyEnvVars.PERSIST_STORAGE, 'true') == s for s in ['false', '0', '']) self._datasets_handled = [] self._key_value_stores_handled = [] self._request_queues_handled = [] diff --git a/tests/unit/actor/test_actor_memory_storage_e2e.py b/tests/unit/actor/test_actor_memory_storage_e2e.py index 6429d282..521fd95c 100644 --- a/tests/unit/actor/test_actor_memory_storage_e2e.py +++ b/tests/unit/actor/test_actor_memory_storage_e2e.py @@ -10,7 +10,7 @@ async def run_e2e_test(monkeypatch: pytest.MonkeyPatch, tmp_path: str, purge_on_start: bool = True) -> None: # Configure purging env var - monkeypatch.setenv(ApifyEnvVars.PURGE_ON_START.value, 'true' if purge_on_start else 'false') + monkeypatch.setenv(ApifyEnvVars.PURGE_ON_START, 'true' if purge_on_start else 'false') # Store old storage client so we have the object reference for comparison old_client = StorageClientManager.get_storage_client() async with Actor: From 4fb6965a444886dfd51d93880acfe6e9325da7e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ji=C5=99=C3=AD=20Morav=C4=8D=C3=ADk?= Date: Tue, 31 Jan 2023 09:44:26 +0100 Subject: [PATCH 4/4] address PR comments --- src/apify/_utils.py | 5 ++--- src/apify/memory_storage/memory_storage.py | 4 ++-- tests/unit/actor/test_actor_memory_storage_e2e.py | 14 +++++--------- tests/unit/test_utils.py | 7 +++++++ 4 files changed, 16 insertions(+), 14 deletions(-) diff --git a/src/apify/_utils.py b/src/apify/_utils.py index 7f5d89d0..b9eb4882 100644 --- a/src/apify/_utils.py +++ b/src/apify/_utils.py @@ -403,9 +403,8 @@ def _budget_ow( def validate_single(field_value: Any, expected_type: Type, required: bool, name: str) -> None: if field_value is None and required: raise ValueError(f'"{name}" is required!') - actual_type = type(field_value) - if (field_value is not None or required) and actual_type != expected_type: - raise ValueError(f'"{name}" must be of type "{expected_type.__name__}" but it is "{actual_type.__name__}"!') + if (field_value is not None or required) and not isinstance(field_value, expected_type): + raise ValueError(f'"{name}" must be of type "{expected_type.__name__}" but it is "{type(field_value).__name__}"!') # Validate object if isinstance(value, dict) and isinstance(predicate, dict): diff --git a/src/apify/memory_storage/memory_storage.py b/src/apify/memory_storage/memory_storage.py index 793feaad..41ec0b50 100644 --- a/src/apify/memory_storage/memory_storage.py +++ b/src/apify/memory_storage/memory_storage.py @@ -6,6 +6,7 @@ from aiofiles import ospath from aiofiles.os import rename, scandir +from .._utils import _maybe_parse_bool from ..consts import ApifyEnvVars from .resource_clients.dataset import DatasetClient from .resource_clients.dataset_collection import DatasetCollectionClient @@ -53,8 +54,7 @@ def __init__( self._key_value_stores_directory = os.path.join(self._local_data_directory, 'key_value_stores') self._request_queues_directory = os.path.join(self._local_data_directory, 'request_queues') self._write_metadata = write_metadata if write_metadata is not None else '*' in os.getenv('DEBUG', '') - self._persist_storage = persist_storage if persist_storage is not None else not any( - os.getenv(ApifyEnvVars.PERSIST_STORAGE, 'true') == s for s in ['false', '0', '']) + self._persist_storage = persist_storage if persist_storage is not None else _maybe_parse_bool(os.getenv(ApifyEnvVars.PERSIST_STORAGE, 'true')) self._datasets_handled = [] self._key_value_stores_handled = [] self._request_queues_handled = [] diff --git a/tests/unit/actor/test_actor_memory_storage_e2e.py b/tests/unit/actor/test_actor_memory_storage_e2e.py index 521fd95c..65ca4f3a 100644 --- a/tests/unit/actor/test_actor_memory_storage_e2e.py +++ b/tests/unit/actor/test_actor_memory_storage_e2e.py @@ -8,7 +8,11 @@ from apify.storages import StorageManager -async def run_e2e_test(monkeypatch: pytest.MonkeyPatch, tmp_path: str, purge_on_start: bool = True) -> None: +@pytest.mark.parametrize('purge_on_start', [True, False]) +async def test_actor_memory_storage_e2e(monkeypatch: pytest.MonkeyPatch, tmp_path: str, purge_on_start: bool) -> None: + """This test simulates two clean runs using memory storage. + The second run attempts to access data created by the first one. + We run 2 configurations with different `purge_on_start`.""" # Configure purging env var monkeypatch.setenv(ApifyEnvVars.PURGE_ON_START, 'true' if purge_on_start else 'false') # Store old storage client so we have the object reference for comparison @@ -48,11 +52,3 @@ def get_storage_client() -> 'MemoryStorage': else: assert default_value == 'default value' assert non_default_value == 'non-default value' - - -async def test_actor_memory_storage_e2e(monkeypatch: pytest.MonkeyPatch, tmp_path: str) -> None: - """This test simulates two clean runs using memory storage. - The second run attempts to access data created by the first one. - We run 2 configurations with different `purge_on_start`.""" - await run_e2e_test(monkeypatch, tmp_path, purge_on_start=True) - await run_e2e_test(monkeypatch, tmp_path, purge_on_start=False) diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py index e266571e..bdcde346 100644 --- a/tests/unit/test_utils.py +++ b/tests/unit/test_utils.py @@ -3,6 +3,7 @@ import io import os import uuid +from collections import OrderedDict from enum import Enum import pytest @@ -340,3 +341,9 @@ def test__budget_ow() -> None: _budget_ow({}, {'id': (str, True)}) with pytest.raises(ValueError, match='must be of type'): _budget_ow({'id': 123}, {'id': (str, True)}) + # Check if subclasses pass the check + _budget_ow({ + 'ordered_dict': OrderedDict(), + }, { + 'ordered_dict': (dict, False), + })