Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Various fixes and improvements #41

Merged
merged 5 commits into from
Jan 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 40 additions & 6 deletions src/apify/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from enum import Enum
from typing import Any, Callable, Dict, Generic, ItemsView, Iterator, NoReturn, Optional
from typing import OrderedDict as OrderedDictType
from typing import Type, TypeVar, Union, ValuesView, cast, overload
from typing import Tuple, Type, TypeVar, Union, ValuesView, cast, overload

import aioshutil
import psutil
Expand Down Expand Up @@ -285,12 +285,12 @@ def _is_file_or_bytes(value: Any) -> bool:
return isinstance(value, (bytes, bytearray, io.IOBase))


def _maybe_parse_body(body: bytes, content_type: str) -> Any: # TODO: Improve return type
def _maybe_parse_body(body: bytes, content_type: str) -> Any:
try:
if _is_content_type_json(content_type):
return json.loads(body) # Returns any
elif _is_content_type_xml(content_type) or _is_content_type_text(content_type):
return body.decode('utf-8') # TODO: Check if utf-8 can be assumed
return body.decode('utf-8')
except ValueError as err:
print('_maybe_parse_body error', err)
return body
Expand Down Expand Up @@ -361,12 +361,11 @@ def __setitem__(self, key: str, value: T) -> None:

def __delitem__(self, key: str) -> None:
"""Remove an item from the cache."""
# TODO: maybe do? self._cache.__delitem__(key)
del self._cache[key]

def __iter__(self) -> Iterator:
def __iter__(self) -> Iterator[str]:
"""Iterate over the keys of the cache in order of insertion."""
yield from self._cache.__iter__()
return self._cache.__iter__()

def __len__(self) -> int:
"""Get the number of items in the cache."""
Expand All @@ -383,3 +382,38 @@ def items(self) -> ItemsView[str, T]: # Needed so we don't mutate the cache by

def _is_running_in_ipython() -> bool:
return getattr(builtins, '__IPYTHON__', False)


@overload
def _budget_ow(value: Union[str, int, float, bool], predicate: Tuple[Type, bool], value_name: str) -> None: # noqa: U100
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😄 😄 great name

...


@overload
def _budget_ow(value: Dict, predicate: Dict[str, Tuple[Type, bool]]) -> None: # noqa: U100
...


def _budget_ow(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess, there should be some dynamic type checker in python. But I like the name maybe you should create package of it 😄

value: Union[Dict, str, int, float, bool],
predicate: Union[Dict[str, Tuple[Type, bool]], Tuple[Type, bool]],
value_name: Optional[str] = None,
) -> None:
"""Budget version of ow."""
def validate_single(field_value: Any, expected_type: Type, required: bool, name: str) -> None:
if field_value is None and required:
raise ValueError(f'"{name}" is required!')
if (field_value is not None or required) and not isinstance(field_value, expected_type):
raise ValueError(f'"{name}" must be of type "{expected_type.__name__}" but it is "{type(field_value).__name__}"!')

# Validate object
if isinstance(value, dict) and isinstance(predicate, dict):
for key, (field_type, required) in predicate.items():
field_value = value.get(key)
validate_single(field_value, field_type, required, key)
# Validate "primitive"
elif isinstance(value, (int, str, float, bool)) and isinstance(predicate, tuple) and value_name is not None:
field_type, required = predicate
validate_single(value, field_type, required, value_name)
else:
raise ValueError('Wrong input!')
2 changes: 1 addition & 1 deletion src/apify/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ async def _push_data_internal(self, data: Any) -> None:
if not data:
return

if not isinstance(data, list): # TODO: Memory storage does this on its own...
if not isinstance(data, list):
data = [data]

dataset = await self.open_dataset()
Expand Down
9 changes: 5 additions & 4 deletions src/apify/memory_storage/memory_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from aiofiles import ospath
from aiofiles.os import rename, scandir

from .._utils import _maybe_parse_bool
from ..consts import ApifyEnvVars
from .resource_clients.dataset import DatasetClient
from .resource_clients.dataset_collection import DatasetCollectionClient
from .resource_clients.key_value_store import KeyValueStoreClient
Expand Down Expand Up @@ -38,7 +40,7 @@ class MemoryStorage:
"""Indicates whether a purge was already performed on this instance"""

def __init__(
self, *, local_data_directory: str = './storage', write_metadata: Optional[bool] = None, persist_storage: Optional[bool] = None,
self, *, local_data_directory: Optional[str] = None, write_metadata: Optional[bool] = None, persist_storage: Optional[bool] = None,
) -> None:
"""Initialize the MemoryStorage.

Expand All @@ -47,13 +49,12 @@ def __init__(
persist_storage (bool, optional): Whether to persist the data to the `local_data_directory` or just keep them in memory
write_metadata (bool, optional): Whether to persist metadata of the storages as well
"""
self._local_data_directory = local_data_directory # TODO: Make this work with `APIFY_LOCAL_STORAGE_DIR`
self._local_data_directory = local_data_directory or os.getenv(ApifyEnvVars.LOCAL_STORAGE_DIR) or './storage'
self._datasets_directory = os.path.join(self._local_data_directory, 'datasets')
self._key_value_stores_directory = os.path.join(self._local_data_directory, 'key_value_stores')
self._request_queues_directory = os.path.join(self._local_data_directory, 'request_queues')
self._write_metadata = write_metadata if write_metadata is not None else '*' in os.getenv('DEBUG', '')
self._persist_storage = persist_storage if persist_storage is not None else not any(
os.getenv('APIFY_PERSIST_STORAGE', 'true') == s for s in ['false', '0', ''])
self._persist_storage = persist_storage if persist_storage is not None else _maybe_parse_bool(os.getenv(ApifyEnvVars.PERSIST_STORAGE, 'true'))
self._datasets_handled = []
self._key_value_stores_handled = []
self._request_queues_handled = []
Expand Down
3 changes: 0 additions & 3 deletions src/apify/memory_storage/resource_clients/request_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ async def add_request(self, request: Dict, *, forefront: Optional[bool] = None)
Returns:
dict: The added request.
"""
# TODO: Throw if uniqueKey or url missing from request dict, also do for update_request...
existing_queue_by_id = _find_or_cache_request_queue_by_possible_id(self._client, self._name or self._id)

if existing_queue_by_id is None:
Expand All @@ -175,7 +174,6 @@ async def add_request(self, request: Dict, *, forefront: Optional[bool] = None)
}

existing_queue_by_id._requests[request_model['id']] = request_model
# TODO: Validate the next line logic, seems wrong in crawlee
existing_queue_by_id._pending_request_count += 0 if request_model['orderNo'] is None else 1
await existing_queue_by_id._update_timestamps(True)
await _update_request_queue_item(
Expand Down Expand Up @@ -248,7 +246,6 @@ async def update_request(self, request: Dict, *, forefront: Optional[bool] = Non
request_was_handled_before_update = existing_request['orderNo'] is None

# We add 1 pending request if previous state was handled
# TODO: Validate the next 2 lines logic, seems wrong in crawlee
if is_request_handled_state_changing:
pending_count_adjustment = 1 if request_was_handled_before_update else -1

Expand Down
2 changes: 1 addition & 1 deletion src/apify/storages/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ async def export_to(

if content_type == 'text/csv':
output = io.StringIO()
writer = csv.writer(output, quoting=csv.QUOTE_MINIMAL) # TODO: Compare quoting behavior with TS impl
writer = csv.writer(output, quoting=csv.QUOTE_MINIMAL)
writer.writerows([items[0].keys(), *[item.values() for item in items]])
value = output.getvalue()
return await key_value_store.set_value(key, value, content_type)
Expand Down
2 changes: 0 additions & 2 deletions src/apify/storages/key_value_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,10 @@ async def iterate_keys(self, exclusive_start_key: Optional[str] = None) -> Async
where `key` is the record key, and `info` is an object that contains a single property `size`
indicating size of the record in bytes.
"""
index = 0
while True:
list_keys = await self._client.list_keys(exclusive_start_key=exclusive_start_key)
for item in list_keys['items']:
yield IterateKeysTuple(item['key'], {'size': item['size']})
index += 1

if not list_keys['isTruncated']:
break
Expand Down
26 changes: 21 additions & 5 deletions src/apify/storages/request_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from apify_client import ApifyClientAsync
from apify_client.clients import RequestQueueClientAsync

from .._utils import LRUCache, _crypto_random_object_id, _unique_key_to_request_id
from .._utils import LRUCache, _budget_ow, _crypto_random_object_id, _unique_key_to_request_id
from ..config import Configuration
from ..consts import REQUEST_QUEUE_HEAD_MAX_LIMIT
from ..memory_storage import MemoryStorage
Expand Down Expand Up @@ -126,7 +126,7 @@ async def _create_instance(cls, request_queue_id_or_name: str, client: Union[Api
def _get_default_name(cls, config: Configuration) -> str:
return config.default_request_queue_id

async def add_request(self, request: Dict, *, forefront: bool = False) -> Dict: # TODO: Validate request with pydantic
async def add_request(self, request: Dict, *, forefront: bool = False) -> Dict:
"""Add a request to the queue.

Args:
Expand All @@ -136,8 +136,14 @@ async def add_request(self, request: Dict, *, forefront: bool = False) -> Dict:
Returns:
dict: Information about the queue operation with keys `requestId`, `uniqueKey`, `wasAlreadyPresent`, `wasAlreadyHandled`.
"""
_budget_ow(request, {
'url': (str, True),
})
self._last_activity = datetime.utcnow()

if request.get('uniqueKey') is None:
request['uniqueKey'] = request['url'] # TODO: Check Request class in crawlee and replicate uniqueKey generation logic...

cache_key = _unique_key_to_request_id(request['uniqueKey'])
cached_info = self._requests_cache.get(cache_key)

Expand Down Expand Up @@ -174,7 +180,8 @@ async def get_request(self, request_id: str) -> Optional[Dict]:
Returns:
dict, optional: The retrieved request, or `None`, if it does not exist.
"""
return await self._client.get_request(request_id) # TODO: Maybe create a Request class?
_budget_ow(request_id, (str, True), 'request_id')
return await self._client.get_request(request_id) # TODO: Maybe create a Request dataclass?

async def fetch_next_request(self) -> Optional[Dict]:
"""Return the next request in the queue to be processed.
Expand Down Expand Up @@ -241,7 +248,7 @@ async def fetch_next_request(self) -> Optional[Dict]:

return request

async def mark_request_as_handled(self, request: Dict) -> Optional[Dict]: # TODO: Validate request with pydantic
async def mark_request_as_handled(self, request: Dict) -> Optional[Dict]:
"""Mark a request as handled after successful processing.

Handled requests will never again be returned by the `RequestQueue.fetch_next_request` method.
Expand All @@ -253,6 +260,11 @@ async def mark_request_as_handled(self, request: Dict) -> Optional[Dict]: # TOD
dict, optional: Information about the queue operation with keys `requestId`, `uniqueKey`, `wasAlreadyPresent`, `wasAlreadyHandled`.
`None` if the given request was not in progress.
"""
_budget_ow(request, {
'id': (str, True),
'uniqueKey': (str, True),
'handledAt': (datetime, False),
})
self._last_activity = datetime.utcnow()
if request['id'] not in self._in_progress:
logging.debug(f'Cannot mark request {request["id"]} as handled, because it is not in progress!')
Expand All @@ -272,7 +284,7 @@ async def mark_request_as_handled(self, request: Dict) -> Optional[Dict]: # TOD

return queue_operation_info

async def reclaim_request(self, request: Dict, forefront: bool = False) -> Optional[Dict]: # TODO: Validate request with pydantic
async def reclaim_request(self, request: Dict, forefront: bool = False) -> Optional[Dict]:
"""Reclaim a failed request back to the queue.

The request will be returned for processing later again
Expand All @@ -285,6 +297,10 @@ async def reclaim_request(self, request: Dict, forefront: bool = False) -> Optio
dict, optional: Information about the queue operation with keys `requestId`, `uniqueKey`, `wasAlreadyPresent`, `wasAlreadyHandled`.
`None` if the given request was not in progress.
"""
_budget_ow(request, {
'id': (str, True),
'uniqueKey': (str, True),
})
self._last_activity = datetime.utcnow()

if request['id'] not in self._in_progress:
Expand Down
54 changes: 54 additions & 0 deletions tests/unit/actor/test_actor_memory_storage_e2e.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import pytest

from apify import Actor
from apify.config import Configuration
from apify.consts import ApifyEnvVars
from apify.memory_storage import MemoryStorage
from apify.storage_client_manager import StorageClientManager
from apify.storages import StorageManager


@pytest.mark.parametrize('purge_on_start', [True, False])
async def test_actor_memory_storage_e2e(monkeypatch: pytest.MonkeyPatch, tmp_path: str, purge_on_start: bool) -> None:
"""This test simulates two clean runs using memory storage.
The second run attempts to access data created by the first one.
We run 2 configurations with different `purge_on_start`."""
# Configure purging env var
monkeypatch.setenv(ApifyEnvVars.PURGE_ON_START, 'true' if purge_on_start else 'false')
# Store old storage client so we have the object reference for comparison
old_client = StorageClientManager.get_storage_client()
async with Actor:
old_default_kvs = await Actor.open_key_value_store()
old_non_default_kvs = await Actor.open_key_value_store('non-default')
# Create data in default and non-default key-value store
await old_default_kvs.set_value('test', 'default value')
await old_non_default_kvs.set_value('test', 'non-default value')

# Clean up singletons and mock a new memory storage
monkeypatch.setattr(Actor, '_default_instance', None)
monkeypatch.setattr(Configuration, '_default_instance', None)
monkeypatch.setattr(StorageManager, '_default_instance', None)
monkeypatch.setattr(StorageClientManager, '_default_instance', None)

new_patched_memory_storage = MemoryStorage(local_data_directory=tmp_path)

def get_storage_client() -> 'MemoryStorage':
return new_patched_memory_storage
monkeypatch.setattr(StorageClientManager, 'get_storage_client', get_storage_client)

# We simulate another clean run, we expect the memory storage to read from the local data directory
# Default storages are purged based on purge_on_start parameter.
async with Actor:
# Check if we're using a different memory storage instance
assert old_client is not StorageClientManager.get_storage_client()
default_kvs = await Actor.open_key_value_store()
assert default_kvs is not old_default_kvs
non_default_kvs = await Actor.open_key_value_store('non-default')
assert non_default_kvs is not old_non_default_kvs
default_value = await default_kvs.get_value('test')
non_default_value = await non_default_kvs.get_value('test')
if purge_on_start:
assert default_value is None
else:
assert default_value == 'default value'
assert non_default_value == 'non-default value'
8 changes: 3 additions & 5 deletions tests/unit/conftest.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import inspect
from collections import defaultdict
from typing import Any, AsyncIterator, Callable, Dict, List, Optional, Tuple, get_type_hints
from typing import Any, Callable, Dict, List, Optional, Tuple, get_type_hints

import pytest

Expand Down Expand Up @@ -121,7 +121,5 @@ def apify_client_async_patcher(monkeypatch: pytest.MonkeyPatch) -> ApifyClientAs


@pytest.fixture()
async def memory_storage(tmp_path: str) -> AsyncIterator[MemoryStorage]:
ms = MemoryStorage(local_data_directory=tmp_path, write_metadata=True) # TODO: Remove write_metadata=True as it's not the default setting...
yield ms
await ms.purge() # TODO: Do we want this here? there are unit tests specifically for purge
def memory_storage(tmp_path: str) -> MemoryStorage:
return MemoryStorage(local_data_directory=tmp_path)
14 changes: 8 additions & 6 deletions tests/unit/memory_storage/resource_clients/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,16 @@ async def test_get(dataset_client: DatasetClient) -> None:

async def test_update(dataset_client: DatasetClient) -> None:
new_dataset_name = 'test-update'
await dataset_client.push_items({'abc': 123})
old_dataset_info = await dataset_client.get()
assert old_dataset_info is not None
old_dataset_directory = os.path.join(dataset_client._client._datasets_directory, old_dataset_info['name'])
new_dataset_directory = os.path.join(dataset_client._client._datasets_directory, new_dataset_name)
assert os.path.exists(os.path.join(old_dataset_directory, '__metadata__.json')) is True
assert os.path.exists(os.path.join(new_dataset_directory, '__metadata__.json')) is False
assert os.path.exists(os.path.join(old_dataset_directory, '000000001.json')) is True
assert os.path.exists(os.path.join(new_dataset_directory, '000000001.json')) is False
updated_dataset_info = await dataset_client.update(name=new_dataset_name)
assert os.path.exists(os.path.join(old_dataset_directory, '__metadata__.json')) is False
assert os.path.exists(os.path.join(new_dataset_directory, '__metadata__.json')) is True
assert os.path.exists(os.path.join(old_dataset_directory, '000000001.json')) is False
assert os.path.exists(os.path.join(new_dataset_directory, '000000001.json')) is True
# Only modifiedAt and accessedAt should be different
assert old_dataset_info['createdAt'] == updated_dataset_info['createdAt']
assert old_dataset_info['modifiedAt'] != updated_dataset_info['modifiedAt']
Expand All @@ -57,12 +58,13 @@ async def test_update(dataset_client: DatasetClient) -> None:


async def test_delete(dataset_client: DatasetClient) -> None:
await dataset_client.push_items({'abc': 123})
dataset_info = await dataset_client.get()
assert dataset_info is not None
dataset_directory = os.path.join(dataset_client._client._datasets_directory, dataset_info['name'])
assert os.path.exists(os.path.join(dataset_directory, '__metadata__.json')) is True
assert os.path.exists(os.path.join(dataset_directory, '000000001.json')) is True
await dataset_client.delete()
assert os.path.exists(os.path.join(dataset_directory, '__metadata__.json')) is False
assert os.path.exists(os.path.join(dataset_directory, '000000001.json')) is False
# Does not crash when called again
await dataset_client.delete()

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import os

import pytest

Expand All @@ -14,10 +13,8 @@ def datasets_client(memory_storage: MemoryStorage) -> DatasetCollectionClient:
async def test_get_or_create(datasets_client: DatasetCollectionClient) -> None:
dataset_name = 'test'
# A new dataset gets created
assert os.path.exists(os.path.join(datasets_client._datasets_directory, dataset_name, '__metadata__.json')) is False
dataset_info = await datasets_client.get_or_create(name=dataset_name)
assert dataset_info['name'] == dataset_name
assert os.path.exists(os.path.join(datasets_client._datasets_directory, dataset_name, '__metadata__.json')) is True
# Another get_or_create call returns the same dataset
dataset_info_existing = await datasets_client.get_or_create(name=dataset_name)
assert dataset_info['id'] == dataset_info_existing['id']
Expand Down
Loading