Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: better import #228

Merged
merged 6 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
ignore = E203, E501, W503
max-line-length = 88
exclude = .git,__pycache__,build,dist,*_pb2.py,.venv
max-doc-length = 79
max-doc-length = 88
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,5 @@ repos:
- id: check-translations
name: "Check if translations files in 'frontend/xliff' need to be updated"
entry: make check_translations
types_or: [ts,javascript,pofile]
language: system
183 changes: 130 additions & 53 deletions app/_import.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import abc
import math
from datetime import datetime
from multiprocessing import Pool
from pathlib import Path
Expand All @@ -10,7 +11,7 @@
from elasticsearch_dsl import Index, Search
from redis import Redis

from app._types import JSONType
from app._types import FetcherResult, FetcherStatus, JSONType
from app.config import Config, IndexConfig, TaxonomyConfig
from app.indexing import (
DocumentProcessor,
Expand All @@ -29,7 +30,7 @@ def __init__(self, config: IndexConfig) -> None:
self.config = config

@abc.abstractmethod
def fetch_document(self, stream_name: str, item: JSONType) -> JSONType | None:
def fetch_document(self, stream_name: str, item: JSONType) -> FetcherResult:
"""Fetch a document using elements coming from a Redis stream.

The Redis stream contains information about documents that were
Expand All @@ -38,6 +39,7 @@ def fetch_document(self, stream_name: str, item: JSONType) -> JSONType | None:

:param stream_name: the name of the Redis stream
:param item: the item from the Redis stream
:return: the fetched document and a status that will pilot the action to be done
"""
pass

Expand All @@ -59,9 +61,9 @@ def get_processed_since(
id_field_name: str,
document_fetcher: BaseDocumentFetcher,
batch_size: int = 100,
) -> Iterator[tuple[int, JSONType]]:
) -> Iterator[tuple[int, FetcherResult]]:
"""Fetches all the documents that have been processed since the given
timestamp.
timestamp (using redis event stream).

:param redis_client: the Redis client
:param redis_stream_name: the name of the Redis stream to read from
Expand All @@ -71,7 +73,8 @@ def get_processed_since(
:param batch_size: the size of the batch to fetch, defaults to 100
:yield: a tuple containing the timestamp (in milliseconds) and the document
"""
fetched_ids = set()
# store fetched ids with a timestamp
fetched_ids: dict[str, int] = {}
# We start from the given timestamp
min_id = f"{start_timestamp_ms}-0"

Expand All @@ -96,15 +99,32 @@ def get_processed_since(
# Get the timestamp from the ID
timestamp = int(timestamp_id.split("-")[0])
# Avoid fetching the same ID repeatedly
if id_ in fetched_ids:
if id_ in fetched_ids and fetched_ids[id_] > timestamp:
logger.debug(f"Skipping ID {id_} because it was already fetched")
continue
fetched_ids.add(id_)
document = document_fetcher.fetch_document(redis_stream_name, item)
if document is None:
logger.debug(f"Skipping ID {id_} because it was not found")
continue
yield timestamp, document
# use current timestamp
# as a pessimistic timestamp of last index update
# *1000 because python timestamp are in seconds,
# redis in milliseconds
fetched_ids[id_] = math.floor(datetime.now().timestamp() * 1000)
result = document_fetcher.fetch_document(redis_stream_name, item)
if result.status == FetcherStatus.SKIP:
logger.debug(f"Skipping ID {id_} because fetches stated to do so")
elif result.status == FetcherStatus.RETRY:
logger.warn(
f"Should retry ID {id_} due to status RETRY, but it's not yet implemented !"
)
elif result.status == FetcherStatus.REMOVED:
yield timestamp, result
elif result.status == FetcherStatus.FOUND:
if result.document is None:
logger.error(
f"Document is None for ID {id_}, while status is FOUND !"
)
else:
yield timestamp, result
else:
logger.debug(f"Skipping ID {id_} due to status {result.status.name}")


def get_new_updates(
Expand All @@ -113,7 +133,7 @@ def get_new_updates(
id_field_names: dict[str, str],
document_fetchers: dict[str, BaseDocumentFetcher],
batch_size: int = 100,
) -> Iterator[tuple[str, int, JSONType]]:
) -> Iterator[tuple[str, int, FetcherResult]]:
"""Reads new updates from Redis Stream, starting from the moment this
function is called.

Expand Down Expand Up @@ -152,27 +172,52 @@ def get_new_updates(
logger.debug("Fetched ID: %s", id_)
# Get the timestamp from the ID
timestamp = int(timestamp_id.split("-")[0])
document = document_fetcher.fetch_document(stream_name, item)
if document is None:
result = document_fetcher.fetch_document(stream_name, item)
if result.status == FetcherStatus.SKIP:
logger.debug(
f"Skipping ID {id_} in {stream_name} because fetches stated to do so"
)
elif result.status == FetcherStatus.RETRY:
logger.warn(
f"Should retry ID {id_} in {stream_name} due to status RETRY, "
"but it's not yet implemented !"
)
elif result.status == FetcherStatus.REMOVED:
yield stream_name, timestamp, result
elif result.status == FetcherStatus.FOUND:
if result.document is None:
logger.error(
f"Document is None for ID {id_} in {stream_name}, while status is FOUND !"
)
else:
yield stream_name, timestamp, result
else:
logger.debug(
"Stream %s: Skipping ID %s because it was not found",
stream_name,
id_,
f"Skipping ID {id_} in {stream_name} due to status {result.status.name}"
)
continue
yield stream_name, timestamp, document


def get_document_dict(
processor: DocumentProcessor, row: JSONType, next_index: str
processor: DocumentProcessor, result: FetcherResult, index_name: str
) -> JSONType | None:
"""Return the document dict suitable for a bulk insert operation."""
document = processor.from_dict(row)
if result.document is None:
return None
result = processor.from_result(result)
document = result.document
if not document:
return None

_id = document.pop("_id")
return {"_source": document, "_index": next_index, "_id": _id}
elif result.status == FetcherStatus.FOUND:
_id = document.pop("_id")
return {"_source": document, "_index": index_name, "_id": _id}
elif result.status == FetcherStatus.REMOVED:
return {
"_op_type": "delete",
"_index": index_name,
"_id": document["_id"],
}
else:
return None


def gen_documents(
Expand All @@ -194,7 +239,11 @@ def gen_documents(
if i % num_processes != process_id:
continue

document_dict = get_document_dict(processor, row, next_index)
document_dict = get_document_dict(
processor,
FetcherResult(status=FetcherStatus.FOUND, document=row),
next_index,
)
if not document_dict:
continue

Expand Down Expand Up @@ -293,6 +342,7 @@ def import_parallel(
)
if not success:
logger.error("Encountered errors: %s", errors)
return success, errors


def import_taxonomies(config: IndexConfig, next_index: str):
Expand Down Expand Up @@ -342,29 +392,30 @@ def get_redis_products(
logger.info("Processing redis updates since %s", last_updated_timestamp_ms)
redis_client = connection.get_redis_client()
processed = 0
for _, row in get_processed_since(
for _, result in get_processed_since(
redis_client,
stream_name,
last_updated_timestamp_ms,
id_field_name,
document_fetcher=fetcher,
):
yield get_document_dict(processor, row, index)
document_dict = get_document_dict(processor, result, index)
if document_dict:
yield document_dict
processed += 1
logger.info("Processed %d updates from Redis", processed)


def get_redis_updates(
es_client: Elasticsearch, index: str, config: IndexConfig
) -> None:
def get_redis_updates(es_client: Elasticsearch, index: str, config: IndexConfig) -> int:
"""Fetch updates from Redis and index them.

:param index: the index to write to
:param config: the index configuration to use
:return: the number of errors encountered
"""
if config.redis_stream_name is None:
logger.info(f"Redis updates are disabled for index {index}")
return
return 0

processor = DocumentProcessor(config)
fetcher = load_document_fetcher(config)
Expand All @@ -385,6 +436,7 @@ def get_redis_updates(
last_updated_timestamp_ms = int(last_updated_timestamp * 1000)
id_field_name = config.index.id_field_name
# Since this is only done by a single process, we can use parallel_bulk
num_errors = 0
for success, info in parallel_bulk(
es_client,
get_redis_products(
Expand All @@ -398,14 +450,17 @@ def get_redis_updates(
):
if not success:
logger.warning("A document failed: %s", info)
num_errors += 1
return num_errors


def run_full_import(
def run_items_import(
file_path: Path,
num_processes: int,
config: IndexConfig,
num_items: int | None = None,
skip_updates: bool = False,
partial: bool = False,
):
"""Run a full data import from a JSONL.

Expand All @@ -420,16 +475,23 @@ def run_full_import(
import
:param config: the index configuration to use
:param num_items: the number of items to import, defaults to None (all)
:param skip_updates: if True, skip the updates from Redis
:param partial: (exclusive with `skip_updates`),
if True consider we don't have a full import,
and directly updates items in current index.
"""
es_client = connection.get_es_client()
# we create a temporary index to import to
# at the end we will change alias to point to it
index_date = datetime.now().strftime("%Y-%m-%d-%H-%M-%S-%f")
next_index = f"{config.index.name}-{index_date}"

index = generate_index_object(next_index, config)
# create the index
index.save()
if not partial:
# we create a temporary index to import to
# at the end we will change alias to point to it
index_date = datetime.now().strftime("%Y-%m-%d-%H-%M-%S-%f")
next_index = f"{config.index.name}-{index_date}"
index = generate_index_object(next_index, config)
# create the index
index.save()
else:
# use current index
next_index = config.index.name

# split the work between processes
args = []
Expand All @@ -445,13 +507,18 @@ def run_full_import(
)
)
# run in parallel
num_errors = 0
with Pool(num_processes) as pool:
pool.starmap(import_parallel, args)
for success, errors in pool.starmap(import_parallel, args):
if not success:
num_errors += len(errors)
# update with last index updates (hopefully since the jsonl)
if not skip_updates:
get_redis_updates(es_client, next_index, config)
# make alias point to new index
update_alias(es_client, next_index, config.index.name)
num_errors += get_redis_updates(es_client, next_index, config)
if not partial:
# make alias point to new index
update_alias(es_client, next_index, config.index.name)
return num_errors


def perform_taxonomy_import(config: IndexConfig) -> None:
Expand Down Expand Up @@ -510,20 +577,30 @@ def run_update_daemon(config: Config) -> None:
id_field_names[stream_name] = index_config.index.id_field_name
stream_name_to_index_id[stream_name] = index_id

for stream_name, _, document in get_new_updates(
for stream_name, _, result in get_new_updates(
redis_client,
list(id_field_names.keys()),
id_field_names=id_field_names,
document_fetchers=document_fetchers,
):
processed_document = processors[stream_name].from_dict(document)
if not processed_document:
processed_result = processors[stream_name].from_result(result)
processed_document = processed_result.document
if (
processed_result.status not in (FetcherStatus.FOUND, FetcherStatus.REMOVED)
or processed_document is None
):
continue
_id = processed_document.pop("_id")
index_id = stream_name_to_index_id[stream_name]
logger.debug("Document:\n%s", processed_document)
es_client.index(
index=config.indices[index_id].index.name,
body=processed_document,
id=_id,
)
if processed_result.status == FetcherStatus.REMOVED:
es_client.delete(
index=config.indices[index_id].index.name,
id=_id,
)
else:
es_client.index(
index=config.indices[index_id].index.name,
body=processed_document,
id=_id,
)
25 changes: 25 additions & 0 deletions app/_types.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import textwrap
from enum import Enum
from functools import cached_property
from typing import Annotated, Any, Literal, Optional, Tuple, Union, cast, get_type_hints

Expand Down Expand Up @@ -432,3 +433,27 @@ class GetSearchParamsTypes:
facets = _annotation_new_type(str, SEARCH_PARAMS_ANN["facets"])
charts = _annotation_new_type(str, SEARCH_PARAMS_ANN["charts"])
index_id = SEARCH_PARAMS_ANN["index_id"]


class FetcherStatus(Enum):
"""Status of a fetcher

* FOUND - document was found, index it
* REMOVED - document was removed, remove it
* SKIP - skip this document / update
* RETRY - retry this document / update later
* OTHER - unknown error
"""

FOUND = 1
REMOVED = -1
SKIP = 0
RETRY = 2
OTHER = 3


class FetcherResult(BaseModel):
"""Result for a document fecher"""

status: FetcherStatus
document: JSONType | None
Loading
Loading