diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 4889b0e7..536b2805 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,5 +1,11 @@ repos: # Note for all linters: do not forget to update pyproject.toml when updating version. + - repo: https://github.com/python-poetry/poetry + rev: 1.8.4 + hooks: + - id: poetry-lock + args: ["--check"] + - repo: https://github.com/psf/black-pre-commit-mirror rev: 24.8.0 hooks: @@ -15,6 +21,7 @@ repos: rev: 5.13.2 hooks: - id: isort + - repo: https://github.com/pre-commit/mirrors-mypy rev: v1.11.1 hooks: diff --git a/Dockerfile b/Dockerfile index ab7830f8..edfcdb57 100644 --- a/Dockerfile +++ b/Dockerfile @@ -29,7 +29,10 @@ ENV PYTHONUNBUFFERED=1 \ FROM python-base as builder-base RUN curl -sSL https://install.python-poetry.org | python3 - WORKDIR $PYSETUP_PATH -COPY poetry.lock pyproject.toml ./ +# we need README.md for poetry check +COPY poetry.lock pyproject.toml README.md ./ +RUN poetry check --lock || \ + ( echo "Poetry.lock is outdated, please run make update_poetry_lock" && false ) RUN poetry install --without dev # This is our final image @@ -40,6 +43,10 @@ COPY --from=builder-base $POETRY_HOME $POETRY_HOME RUN poetry config virtualenvs.create false ENV POETRY_VIRTUALENVS_IN_PROJECT=false +# create some folders, to later ensure right ownership +RUN mkdir -p /opt/search/data && \ + mkdir -p /opt/search/synonyms + # create off user ARG USER_UID ARG USER_GID @@ -66,8 +73,11 @@ CMD ["uvicorn", "app.api:app", "--proxy-headers", "--host", "0.0.0.0", "--port", # ---------------------- FROM builder-base as builder-dev WORKDIR $PYSETUP_PATH -COPY poetry.lock pyproject.toml ./ +# we need README.md for poetry check +COPY poetry.lock pyproject.toml README.md ./ # full install, with dev packages +RUN poetry check --lock || \ + ( echo "Poetry.lock is outdated, please run make update_poetry_lock" && false ) RUN poetry install # image with dev tooling diff --git a/Makefile b/Makefile index 2bd6409b..795f1fd8 100644 --- a/Makefile +++ b/Makefile @@ -22,6 +22,7 @@ endif DOCKER_COMPOSE=docker compose --env-file=${ENV_FILE} DOCKER_COMPOSE_TEST=COMPOSE_PROJECT_NAME=search_test docker compose --env-file=${ENV_FILE} +.PHONY: build create_external_volumes livecheck up down test test_front test_front_watch test_api import-dataset import-taxonomies sync-scripts build-translations generate-openapi check check_front check_translations lint lint_back lint_front #------------# # Production # #------------# @@ -58,7 +59,7 @@ livecheck: build: @echo "🔎 building docker (for dev)" - ${DOCKER_COMPOSE} build --progress=plain + ${DOCKER_COMPOSE} build --progress=plain ${args} up: _ensure_network @@ -107,15 +108,34 @@ tsc_watch: @echo "🔎 Running front-end tsc in watch mode..." ${DOCKER_COMPOSE} run --rm search_nodejs npm run build:watch +update_poetry_lock: + @echo "🔎 Updating poetry.lock" + ${DOCKER_COMPOSE} run --rm api poetry lock --no-update + #-------# # Tests # #-------# -test: _ensure_network test_api test_front +test: _ensure_network check_poetry_lock test_api test_front + +check_poetry_lock: + @echo "🔎 Checking poetry.lock" +# we have to mount whole project folder for pyproject will be checked + ${DOCKER_COMPOSE} run -v $$(pwd):/project -w /project --rm api poetry check --lock + +test_api: test_api_unit test_api_integration + +test_api_unit: + @echo "🔎 Running API unit tests..." + ${DOCKER_COMPOSE_TEST} run --rm api pytest ${args} tests/ --ignore=tests/int + +# you can use keep_es=1 to avoid stopping elasticsearch after tests (useful during development) +test_api_integration: + @echo "🔎 Running API integration tests..." + ${DOCKER_COMPOSE_TEST} up -d es01 es02 elasticvue + ${DOCKER_COMPOSE_TEST} run --rm api pytest ${args} tests/ --ignore=tests/unit + test -z "${keep_es}" && ${DOCKER_COMPOSE_TEST} stop es01 es02 elasticvue || true -test_api: - @echo "🔎 Running API tests..." - ${DOCKER_COMPOSE_TEST} run --rm api pytest ${args} tests/ test_front: @echo "🔎 Running front-end tests..." @@ -125,6 +145,10 @@ test_front_watch: @echo "🔎 Running front-end tests..." ${DOCKER_COMPOSE_TEST} run --rm search_nodejs npm run test:watch +test_clean: + @echo "🔎 Cleaning tests instances..." + ${DOCKER_COMPOSE_TEST} down -v + #-----------# # Utilities # #-----------# diff --git a/app/_import.py b/app/_import.py index 6330cc22..7789af50 100644 --- a/app/_import.py +++ b/app/_import.py @@ -12,13 +12,14 @@ from redis import Redis from app._types import FetcherResult, FetcherStatus, JSONType -from app.config import Config, IndexConfig, TaxonomyConfig +from app.config import Config, IndexConfig, TaxonomyConfig, settings from app.indexing import ( DocumentProcessor, generate_index_object, generate_taxonomy_index_object, ) -from app.taxonomy import get_taxonomy +from app.taxonomy import iter_taxonomies +from app.taxonomy_es import refresh_synonyms from app.utils import connection, get_logger, load_class_object_from_string from app.utils.io import jsonl_iter @@ -226,17 +227,17 @@ def gen_documents( next_index: str, num_items: int | None, num_processes: int, - process_id: int, + process_num: int, ): - """Generate documents to index for process number process_id + """Generate documents to index for process number process_num - We chunk documents based on document num % process_id + We chunk documents based on document num % process_num """ for i, row in enumerate(tqdm.tqdm(jsonl_iter(file_path))): if num_items is not None and i >= num_items: break # Only get the relevant - if i % num_processes != process_id: + if i % num_processes != process_num: continue document_dict = get_document_dict( @@ -260,26 +261,26 @@ def gen_taxonomy_documents( :param supported_langs: a set of supported languages :yield: a dict with the document to index, compatible with ES bulk API """ - for taxonomy_source_config in tqdm.tqdm(taxonomy_config.sources): - taxonomy = get_taxonomy( - taxonomy_source_config.name, str(taxonomy_source_config.url) - ) + for taxonomy_name, taxonomy in tqdm.tqdm(iter_taxonomies(taxonomy_config)): for node in taxonomy.iter_nodes(): - names = {} - for lang in supported_langs: - lang_names = set() - if lang in node.names: - lang_names.add(node.names[lang]) - if lang in node.synonyms: - lang_names |= set(node.synonyms[lang]) - names[lang] = list(lang_names) + names = { + lang: lang_names + for lang, lang_names in node.names.items() + if lang in supported_langs + } + synonyms = { + lang: lang_names + for lang, lang_names in node.synonyms.items() + if lang in supported_langs + } yield { "_index": next_index, "_source": { "id": node.id, - "taxonomy_name": taxonomy_source_config.name, - "names": names, + "taxonomy_name": taxonomy_name, + "name": names, + "synonyms": synonyms, }, } @@ -304,13 +305,22 @@ def update_alias(es_client: Elasticsearch, next_index: str, index_alias: str): ) +def get_alias(es_client: Elasticsearch, index_name: str): + """Get the current index pointed by the alias.""" + resp = es_client.indices.get_alias(name=index_name) + resp = list(resp.keys()) + if len(resp) == 0: + return None + return resp[0] + + def import_parallel( config: IndexConfig, file_path: Path, next_index: str, num_items: int | None, num_processes: int, - process_id: int, + process_num: int, ): """One task of import. @@ -318,12 +328,12 @@ def import_parallel( :param str next_index: the index to write to :param int num_items: max number of items to import, default to no limit :param int num_processes: total number of processes - :param int process_id: the index of the process + :param int process_num: the index of the process (from 0 to num_processes - 1) """ processor = DocumentProcessor(config) # open a connection for this process - es = connection.get_es_client(timeout=120, retry_on_timeout=True) + es = connection.get_es_client(request_timeout=120, retry_on_timeout=True) # Note that bulk works better than parallel bulk for our usecase. # The preprocessing in this file is non-trivial, so it's better to # parallelize that. If we then do parallel_bulk here, this causes queueing @@ -336,13 +346,11 @@ def import_parallel( next_index, num_items, num_processes, - process_id, + process_num, ), raise_on_error=False, ) - if not success: - logger.error("Encountered errors: %s", errors) - return success, errors + return process_num, success, errors def import_taxonomies(config: IndexConfig, next_index: str): @@ -353,8 +361,7 @@ def import_taxonomies(config: IndexConfig, next_index: str): :param config: the index configuration to use :param next_index: the index to write to """ - # open a connection for this process - es = connection.get_es_client(timeout=120, retry_on_timeout=True) + es = connection.current_es_client() # Note that bulk works better than parallel bulk for our usecase. # The preprocessing in this file is non-trivial, so it's better to # parallelize that. If we then do parallel_bulk @@ -480,7 +487,8 @@ def run_items_import( if True consider we don't have a full import, and directly updates items in current index. """ - es_client = connection.get_es_client() + # we need a large timeout as index creation can take a while because of synonyms + es_client = connection.get_es_client(request_timeout=600) if not partial: # we create a temporary index to import to # at the end we will change alias to point to it @@ -488,7 +496,7 @@ def run_items_import( next_index = f"{config.index.name}-{index_date}" index = generate_index_object(next_index, config) # create the index - index.save() + index.save(using=es_client) else: # use current index next_index = config.index.name @@ -509,12 +517,18 @@ def run_items_import( # run in parallel num_errors = 0 with Pool(num_processes) as pool: - for success, errors in pool.starmap(import_parallel, args): - if not success: + for i, success, errors in pool.starmap(import_parallel, args): + # Note: we log here instead of in sub-process because + # it's easier to avoid mixing logs, and it works better for pytest + logger.info("[%d] Indexed %d documents", i, success) + if errors: + logger.error("[%d] Encountered %d errors: %s", i, len(errors), errors) num_errors += len(errors) # update with last index updates (hopefully since the jsonl) if not skip_updates: num_errors += get_redis_updates(es_client, next_index, config) + # wait for index refresh + es_client.indices.refresh(index=next_index) if not partial: # make alias point to new index update_alias(es_client, next_index, config.index.name) @@ -537,11 +551,38 @@ def perform_taxonomy_import(config: IndexConfig) -> None: index.save() import_taxonomies(config, next_index) + # wait for index refresh + es_client.indices.refresh(index=next_index) # make alias point to new index update_alias(es_client, next_index, config.taxonomy.index.name) +def perform_cleanup_indexes(config: IndexConfig) -> int: + """Delete old indexes (that have no active alias on them).""" + removed = 0 + # some timeout for it can be long + es_client = connection.get_es_client(request_timeout=600) + prefixes = [config.index.name, config.taxonomy.index.name] + for prefix in prefixes: + # get all indexes + indexes = es_client.indices.get_alias(index=f"{prefix}-*") + # remove all index without alias + to_remove = [ + index for index, data in indexes.items() if not data.get("aliases") + ] + for index in to_remove: + logger.info("Deleting index %s", index) + es_client.indices.delete(index=index) + removed += 1 + return removed + + +def perform_refresh_synonyms(index_id: str, config: IndexConfig) -> None: + """Refresh synonyms files generated by taxonomies.""" + refresh_synonyms(index_id, config, settings.synonyms_path) + + def run_update_daemon(config: Config) -> None: """Run the update import daemon. diff --git a/app/_types.py b/app/_types.py index 7712c89c..04bc7dc5 100644 --- a/app/_types.py +++ b/app/_types.py @@ -1,12 +1,12 @@ -import textwrap -from enum import Enum +from enum import Enum, StrEnum from functools import cached_property -from typing import Annotated, Any, Literal, Optional, Tuple, Union, cast, get_type_hints +from inspect import cleandoc as cd_ +from typing import Annotated, Any, Literal, Optional, Tuple, Union, cast import elasticsearch_dsl.query import luqum.tree from fastapi import Query -from pydantic import BaseModel, ConfigDict, model_validator +from pydantic import BaseModel, ConfigDict, field_validator, model_validator from . import config from .utils import str_utils @@ -21,22 +21,22 @@ JSONType = dict[str, Any] -class DistributionChartType(BaseModel): +class DistributionChart(BaseModel): """Describes an entry for a distribution chart""" - chart_type: Literal["DistributionChartType"] = "DistributionChartType" + chart_type: Literal["DistributionChart"] = "DistributionChart" field: str -class ScatterChartType(BaseModel): +class ScatterChart(BaseModel): """Describes an entry for a scatter plot""" - chart_type: Literal["ScatterChartType"] = "ScatterChartType" + chart_type: Literal["ScatterChart"] = "ScatterChart" x: str y: str -ChartType = Union[DistributionChartType, ScatterChartType] +ChartType = Union[DistributionChart, ScatterChart] class FacetItem(BaseModel): @@ -73,8 +73,18 @@ class FacetInfo(BaseModel): """Data about selected filters for each facet: facet name -> list of values""" +class DebugInfo(StrEnum): + """Debug information to return in the API""" + + aggregations = "aggregations" + lucene_query = "lucene_query" + es_query = "es_query" + + class SearchResponseDebug(BaseModel): - query: JSONType + lucene_query: str | None = None + es_query: JSONType | None = None + aggregations: JSONType | None = None class SearchResponseError(BaseModel): @@ -98,7 +108,7 @@ class SuccessSearchResponse(BaseModel): page: int page_size: int page_count: int - debug: SearchResponseDebug + debug: SearchResponseDebug | None = None took: int timed_out: bool count: int @@ -126,12 +136,6 @@ class QueryAnalysis(BaseModel): es_query: Optional[elasticsearch_dsl.query.Query] = None """The query as an elasticsearch_dsl object""" - fulltext: Optional[str] = None - """The full text part of the query""" - - filter_query: Optional[JSONType] = None - """The filter part of the query""" - facets_filters: Optional[FacetsFilters] = None """The filters corresponding to the facets: a facet name and a list of values""" @@ -141,8 +145,6 @@ def clone(self, **kwargs): text_query=self.text_query, luqum_tree=self.luqum_tree, es_query=self.es_query, - fulltext=self.fulltext, - filter_query=self.filter_query, facets_filters=self.facets_filters, ) for k, v in kwargs.items(): @@ -155,112 +157,102 @@ def _dict_dump(self): "text_query": self.text_query, "luqum_tree": str(self.luqum_tree), "es_query": self.es_query.to_dict(), - "fulltext": self.fulltext, - "filter_query": self.filter_query, "facets_filters": self.facets_filters, } -INDEX_ID_QUERY_PARAM = Query( - description="""Index ID to use for the search, if not provided, the default index is used. - If there is only one index, this parameter is not needed.""" -) +class CommonParametersQuery: + """Documentation and constraints for some common query parameters""" + index_id = Query( + description=cd_( + """Index ID to use for the search, if not provided, the default index is used. + If there is only one index, this parameter is not needed. + """ + ) + ) -class SearchParameters(BaseModel): - """Common parameters for search""" + +class QuerySearchParameters(BaseModel): + """Parameters for search, + this class concentrates on parameters that define the search query + """ q: Annotated[ str | None, Query( - description="""The search query, it supports Lucene search query -syntax (https://lucene.apache.org/core/3_6_0/queryparsersyntax.html). Words -that are not recognized by the lucene query parser are searched as full text -search. + description=cd_( + """The search query, it supports Lucene search query + syntax (https://lucene.apache.org/core/3_6_0/queryparsersyntax.html). Words + that are not recognized by the lucene query parser are searched as full text + search. -Example: `categories_tags:"en:beverages" strawberry brands:"casino"` query use a -filter clause for categories and brands and look for "strawberry" in multiple -fields. + Example: `categories_tags:"en:beverages" strawberry brands:"casino"` query use a + filter clause for categories and brands and look for "strawberry" in multiple + fields. -The query is optional, but `sort_by` value must then be provided.""" + The query is optional, but `sort_by` value must then be provided. + """ + ) ), ] = None - langs: Annotated[ - list[str], + boost_phrase: Annotated[ + bool, Query( - description="""List of languages we want to support during search. -This list should include the user expected language, and additional languages (such -as english for example). + description=cd_( + """This enables an heuristic that will favor, + matching terms that are consecutive. -This is currently used for language-specific subfields to choose in which -subfields we're searching in. + Technically, if you have a query with the two words `whole milk` + it will boost entries with `"whole milk"` exact match. + The boost factor is defined by `match_phrase_boost` value in Configuration -If not provided, `['en']` is used.""" - ), - ] = ["en"] - page_size: Annotated[ - int, Query(description="Number of results to return per page.") - ] = 10 - page: Annotated[int, Query(ge=1, description="Page to request, starts at 1.")] = 1 - fields: Annotated[ - list[str] | None, - Query( - description="List of fields to include in the response. All other fields will be ignored." - ), - ] = None - sort_by: Annotated[ - str | None, - Query( - description=textwrap.dedent( + Note, that it only make sense if you use best match sorting. + So in any other case it is ignored. """ - Field name to use to sort results, the field should exist - and be sortable. If it is not provided, results are sorted by descending relevance score. - - If you put a minus before the name, the results will be sorted by descending order. - - If the field name match a known script (defined in your configuration), - it will be use for sorting. - - In this case you also need to provide additional parameters corresponding to your script parameters. - If a script needs parameters, you can only use the POST method. + ) + ), + ] = False - Beware that this may have a big [impact on performance][perf_link] + langs: Annotated[ + list[str], + Query( + description=cd_( + """List of languages we want to support during search. + This list should include the user expected language, and additional languages (such + as english for example). - Also bare in mind [privacy considerations][privacy_link] if your script parameters contains sensible data. + This is currently used for language-specific subfields to choose in which + subfields we're searching in. - [perf_link]: https://openfoodfacts.github.io/search-a-licious/users/how-to-use-scripts/#performance-considerations - [privacy_link]: https://openfoodfacts.github.io/search-a-licious/users/how-to-use-scripts/#performance-considerations - """ + If not provided, `['en']` is used. + """ ) ), - ] = None - facets: Annotated[ - list[str] | None, - Query( - description="""Name of facets to return in the response as a comma-separated value. - If None (default) no facets are returned.""" - ), - ] = None - charts: Annotated[ - list[ChartType] | None, - Query( - description="""Name of vega representations to return in the response. - Can be distribution chart or scatter plot""" - ), - ] = None - sort_params: Annotated[ - JSONType | None, - Query( - description="""Additional parameters when using a sort script in sort_by. - If the sort script needs parameters, you can only be used the POST method.""", - ), - ] = None + ] = ["en"] + index_id: Annotated[ str | None, - INDEX_ID_QUERY_PARAM, + CommonParametersQuery.index_id, ] = None + @field_validator("langs", mode="before") + @classmethod + def parse_langs_str(cls, langs: str | list[str]) -> list[str]: + """ + Parse for get params 'langs' + """ + value_str = _prepare_str_list(langs) + if value_str: + langs = value_str.split(",") + else: + # we already know because of code logic that langs is the right type + # but we need to cast for mypy type checking + langs = cast(list[str], langs) + + return langs + @model_validator(mode="after") def validate_index_id(self): """ @@ -270,8 +262,7 @@ def validate_index_id(self): because we want to be able to substitute the default None value, by the default index """ - config.check_config_is_defined() - global_config = cast(config.Config, config.CONFIG) + global_config = config.get_config() check_index_id_is_defined(self.index_id, global_config) self.index_id, _ = global_config.get_index_config(self.index_id) return self @@ -287,25 +278,74 @@ def valid_index_id(self) -> str: return self.index_id @model_validator(mode="after") - def validate_q_or_sort_by(self): - """We want at least one of q or sort_by before launching a request""" - if self.q is None and self.sort_by is None: - raise ValueError("`sort_by` must be provided when `q` is missing") + def check_max_results(self): + """Check we don't ask too many results at once""" + if self.page * self.page_size > 10_000: + raise ValueError( + f"Maximum number of returned results is 10 000 (here: page * page_size = {self.page * self.page_size})", + ) return self @cached_property def index_config(self): """Get the index config once and for all""" - global_config = cast(config.Config, config.CONFIG) + global_config = config.get_config() _, index_config = global_config.get_index_config(self.index_id) return index_config @cached_property - def uses_sort_script(self): - """Does sort_by use a script?""" - index_config = self.index_config - _, sort_by = self.sign_sort_by - return sort_by in index_config.scripts.keys() + def langs_set(self): + return set(self.langs) + + @cached_property + def main_lang(self): + """Get the main lang of the query""" + return self.langs[0] if self.langs else "en" + + +class ResultSearchParameters(BaseModel): + """Parameters that influence results presentation: pagination and sorting""" + + fields: Annotated[ + list[str] | None, + Query( + description="List of fields to include in the response. All other fields will be ignored." + ), + ] = None + + page_size: Annotated[ + int, Query(description="Number of results to return per page.") + ] = 10 + + page: Annotated[int, Query(description="Number of results to return per page.")] = 1 + + sort_by: Annotated[ + str | None, + Query( + description=cd_( + """Field name to use to sort results, + the field should exist and be sortable. + If it is not provided, results are sorted by descending relevance score. + (aka best match) + + If you put a minus before the name, the results will be sorted by descending order. + + If the field name match a known script (defined in your configuration), + it will be use for sorting. + + In this case you also need to provide additional parameters corresponding to your script parameters. + If a script needs parameters, you can only use the POST method. + + Beware that this may have a big [impact on performance][perf_link] + + Also bare in mind [privacy considerations][privacy_link] if your script parameters contains sensible data. + + [perf_link]: https://openfoodfacts.github.io/search-a-licious/users/how-to-use-scripts/#performance-considerations + [privacy_link]: https://openfoodfacts.github.io/search-a-licious/users/how-to-use-scripts/#performance-considerations + """ + ) + ), + ] = None @model_validator(mode="after") def sort_by_is_field_or_script(self): @@ -315,34 +355,45 @@ def sort_by_is_field_or_script(self): is_field = sort_by in index_config.fields # TODO: verify field type is compatible with sorting if not (self.sort_by is None or is_field or self.uses_sort_script): - raise ValueError("`sort_by` must be a valid field name or script name") + raise ValueError( + "`sort_by` must be a valid field name or script name or None" + ) return self - @model_validator(mode="after") - def sort_by_scripts_needs_params(self): - """If sort_by is a script, - verify we got corresponding parameters in sort_params - """ - if self.uses_sort_script: - if self.sort_params is None: - raise ValueError( - "`sort_params` must be provided when using a sort script" - ) - if not isinstance(self.sort_params, dict): - raise ValueError("`sort_params` must be a dict") - # verifies keys are those expected - request_keys = set(self.sort_params.keys()) - sort_sign, sort_by = self.sign_sort_by - expected_keys = set(self.index_config.scripts[sort_by].params.keys()) - if request_keys != expected_keys: - missing = expected_keys - request_keys - missing_str = ("missing keys: " + ", ".join(missing)) if missing else "" - new = request_keys - expected_keys - new_str = ("unexpected keys: " + ", ".join(new)) if new else "" - raise ValueError( - f"sort_params keys must match expected keys. {missing_str} {new_str}" - ) - return self + @cached_property + def uses_sort_script(self): + """Does sort_by use a script?""" + index_config = self.index_config + _, sort_by = self.sign_sort_by + return index_config.scripts and sort_by in index_config.scripts.keys() + + +class AggregateSearchParameters(BaseModel): + + facets: Annotated[ + list[str] | None, + Query( + description=cd_( + """Name of facets to return in the response as a comma-separated value. + If None (default) no facets are returned. + """ + ) + ), + ] = None + + charts: Annotated[ + list[ChartType] | None, + Query( + description=cd_( + """Name of vega representations to return in the response. + Can be distribution chart or scatter plot. + + If you pass a simple string, it will be interpreted as a distribution chart, + or a scatter plot if it is two fields separated by a column (x_axis_field:y_axis_field). + """ + ) + ), + ] = None @model_validator(mode="after") def check_facets_are_valid(self): @@ -357,28 +408,23 @@ def check_charts_are_valid(self): """Check that the graph names are valid.""" if self.charts is None: return self - errors = check_all_values_are_fields_agg( self.index_id, [ chart.field for chart in self.charts - if chart.chart_type == "DistributionChartType" + if chart.chart_type == "DistributionChart" ], ) errors.extend( check_fields_are_numeric( self.index_id, - [ - chart.x - for chart in self.charts - if chart.chart_type == "ScatterChartType" - ] + [chart.x for chart in self.charts if chart.chart_type == "ScatterChart"] + [ chart.y for chart in self.charts - if chart.chart_type == "ScatterChartType" + if chart.chart_type == "ScatterChart" ], ) ) @@ -387,20 +433,7 @@ def check_charts_are_valid(self): raise ValueError(errors) return self - @model_validator(mode="after") - def check_max_results(self): - """Check we don't ask too many results at once""" - if self.page * self.page_size > 10_000: - raise ValueError( - f"Maximum number of returned results is 10 000 (here: page * page_size = {self.page * self.page_size})", - ) - return self - - @property - def langs_set(self): - return set(self.langs) - - @property + @cached_property def sign_sort_by(self) -> Tuple[str_utils.BoolOperator, str | None]: return ( ("+", None) @@ -408,31 +441,150 @@ def sign_sort_by(self) -> Tuple[str_utils.BoolOperator, str | None]: else str_utils.split_sort_by_sign(self.sort_by) ) - @property - def main_lang(self): - """Get the main lang of the query""" - return self.langs[0] if self.langs else "en" +def _prepare_str_list(item: Any) -> str | None: + if isinstance(item, str): + return item + elif isinstance(item, list) and all(isinstance(x, str) for x in item): + return ",".join(item) + return None + + +class SearchParameters( + QuerySearchParameters, ResultSearchParameters, AggregateSearchParameters +): + """Parameters for search, common to GET and POST""" + + # forbid extra parameters to prevent failed expectations because of typos + model_config = {"extra": "forbid"} + + debug_info: Annotated[ + list[DebugInfo] | None, + Query( + description=cd_( + """Tells which debug information to return in the response. + It can be a comma separated list of values + """ + ), + ), + ] = None + + @field_validator("debug_info", mode="before") + @classmethod + def debug_info_list_from_str( + cls, debug_info: str | list[str] | list[DebugInfo] | None + ) -> list[DebugInfo] | None: + """We can pass a comma separated list of DebugInfo values as a string""" + # as we are a before validator, we get a list + str_infos = _prepare_str_list(debug_info) + if str_infos: + values = [getattr(DebugInfo, part, None) for part in str_infos.split(",")] + debug_info = [v for v in values if v is not None] + if debug_info is not None: + # we already know because of code logic that debug_info is the right type + # but we need to cast for mypy type checking + debug_info = cast(list[DebugInfo], debug_info) + return debug_info + + +class GetSearchParameters(SearchParameters): + """GET parameters for search""" + + @field_validator("charts", mode="before") + @classmethod + def parse_charts_str( + cls, charts: str | list[str] | list[ChartType] | None + ) -> list[ChartType] | None: + """ + Parse for get params are 'field' or 'xfield:yfield' + separated by ',' for Distribution and Scatter charts. + + Directly the dictionnaries in POST request + """ + str_charts = _prepare_str_list(charts) + if str_charts: + charts = [] + charts_list = str_charts.split(",") + for c in charts_list: + if ":" in c: + [x, y] = c.split(":") + charts.append(ScatterChart(x=x, y=y)) + else: + charts.append(DistributionChart(field=c)) + if charts is not None: + # we already know because of code logic that charts is the right type + # but we need to cast for mypy type checking + charts = cast(list[ChartType], charts) + return charts + + @model_validator(mode="after") + def validate_q_or_sort_by(self): + """We want at least one of q or sort_by before launching a request""" + if self.q is None and self.sort_by is None: + raise ValueError("`sort_by` must be provided when `q` is missing") + return self -def _annotation_new_type(type_, annotation): - """Use a new type for a given annotation""" - return Annotated[type_, *annotation.__metadata__] + @field_validator("facets", "fields", mode="before") + @classmethod + def parse_value_str(cls, value: str | list[str] | None) -> list[str] | None: + """ + Parse for get params 'langs' + """ + value_str = _prepare_str_list(value) + if value_str: + value = value_str.split(",") + if value is not None: + # we already know because of code logic that value is the right type + # but we need to cast for mypy type checking + value = cast(list[str], value) + return value + + @model_validator(mode="after") + def no_sort_by_scripts_on_get(self): + if self.uses_sort_script: + raise ValueError("`sort_by` must not be a script when using GET") + return self -# types for search parameters for GET -SEARCH_PARAMS_ANN = get_type_hints(SearchParameters, include_extras=True) +class PostSearchParameters(SearchParameters): + """POST parameters for search""" + sort_params: Annotated[ + JSONType | None, + Query( + description=cd_( + """Additional parameters when using a sort script in sort_by. + If the sort script needs parameters, you can only be used the POST method. + """ + ), + ), + ] = None -class GetSearchParamsTypes: - q = SEARCH_PARAMS_ANN["q"] - langs = _annotation_new_type(str, SEARCH_PARAMS_ANN["langs"]) - page_size = SEARCH_PARAMS_ANN["page_size"] - page = SEARCH_PARAMS_ANN["page"] - fields = _annotation_new_type(str, SEARCH_PARAMS_ANN["fields"]) - sort_by = SEARCH_PARAMS_ANN["sort_by"] - facets = _annotation_new_type(str, SEARCH_PARAMS_ANN["facets"]) - charts = _annotation_new_type(str, SEARCH_PARAMS_ANN["charts"]) - index_id = SEARCH_PARAMS_ANN["index_id"] + @model_validator(mode="after") + def sort_by_scripts_needs_params(self): + """If sort_by is a script, + verify we got corresponding parameters in sort_params + """ + if self.uses_sort_script: + if self.sort_params is None: + raise ValueError( + "`sort_params` must be provided when using a sort script" + ) + if not isinstance(self.sort_params, dict): + raise ValueError("`sort_params` must be a dict") + # verifies keys are those expected + request_keys = set(self.sort_params.keys()) + sort_sign, sort_by = self.sign_sort_by + expected_keys = set(self.index_config.scripts[sort_by].params.keys()) + if request_keys != expected_keys: + missing = expected_keys - request_keys + missing_str = ("missing keys: " + ", ".join(missing)) if missing else "" + new = request_keys - expected_keys + new_str = ("unexpected keys: " + ", ".join(new)) if new else "" + raise ValueError( + f"sort_params keys must match expected keys. {missing_str} {new_str}" + ) + return self class FetcherStatus(Enum): @@ -453,7 +605,11 @@ class FetcherStatus(Enum): class FetcherResult(BaseModel): - """Result for a document fecher""" + """Result for a document fetcher + + This is also used by pre-processors + who have the opportunity to discard an entry + """ status: FetcherStatus document: JSONType | None diff --git a/app/api.py b/app/api.py index bb11eafe..6f6fcc47 100644 --- a/app/api.py +++ b/app/api.py @@ -10,20 +10,17 @@ from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import HTMLResponse, PlainTextResponse, RedirectResponse from fastapi.templating import Jinja2Templates -from pydantic import ValidationError import app.search as app_search from app import config from app._types import ( - INDEX_ID_QUERY_PARAM, - DistributionChartType, - GetSearchParamsTypes, - ScatterChartType, - SearchParameters, + CommonParametersQuery, + GetSearchParameters, + PostSearchParameters, SearchResponse, SuccessSearchResponse, ) -from app.config import check_config_is_defined, settings +from app.config import settings from app.postprocessing import process_taxonomy_completion_response from app.query import build_completion_query from app.utils import connection, get_logger, init_sentry @@ -80,11 +77,11 @@ def check_index_id_is_defined_or_400(index_id: str | None, config: config.Config @app.get("/document/{identifier}") def get_document( - identifier: str, index_id: Annotated[str | None, INDEX_ID_QUERY_PARAM] = None + identifier: str, + index_id: Annotated[str | None, CommonParametersQuery.index_id] = None, ): """Fetch a document from Elasticsearch with specific ID.""" - check_config_is_defined() - global_config = cast(config.Config, config.CONFIG) + global_config = config.get_config() check_index_id_is_defined_or_400(index_id, global_config) index_id, index_config = global_config.get_index_config(index_id) @@ -104,71 +101,40 @@ def get_document( return product +def status_for_response(result: SearchResponse): + if isinstance(result, SuccessSearchResponse): + return status.HTTP_200_OK + else: + # TODO: should we refine that ? + return status.HTTP_500_INTERNAL_SERVER_ERROR + + @app.post("/search") -def search(search_parameters: Annotated[SearchParameters, Body()]): +def search( + response: Response, search_parameters: Annotated[PostSearchParameters, Body()] +): """This is the main search endpoint. It uses POST request to ensure privacy. Under the hood, it calls the :py:func:`app.search.search` function """ - return app_search.search(search_parameters) - - -def parse_charts_get(charts_params: str): - """ - Parse for get params are 'field' or 'xfield:yfield' - separated by ',' for Distribution and Scatter charts. - - Directly the dictionnaries in POST request - """ - charts = [] - for c in charts_params.split(","): - if ":" in c: - [x, y] = c.split(":") - charts.append(ScatterChartType(x=x, y=y)) - else: - charts.append(DistributionChartType(field=c)) - return charts + result = app_search.search(search_parameters) + response.status_code = status_for_response(result) + return result @app.get("/search") def search_get( - q: GetSearchParamsTypes.q = None, - langs: GetSearchParamsTypes.langs = None, - page_size: GetSearchParamsTypes.page_size = 10, - page: GetSearchParamsTypes.page = 1, - fields: GetSearchParamsTypes.fields = None, - sort_by: GetSearchParamsTypes.sort_by = None, - facets: GetSearchParamsTypes.facets = None, - charts: GetSearchParamsTypes.charts = None, - index_id: GetSearchParamsTypes.index_id = None, + response: Response, search_parameters: Annotated[GetSearchParameters, Query()] ) -> SearchResponse: """This is the main search endpoint when using GET request Under the hood, it calls the :py:func:`app.search.search` function """ - # str to lists - langs_list = langs.split(",") if langs else ["en"] - fields_list = fields.split(",") if fields else None - facets_list = facets.split(",") if facets else None - charts_list = parse_charts_get(charts) if charts else None - # create SearchParameters object - try: - search_parameters = SearchParameters( - q=q, - langs=langs_list, - page_size=page_size, - page=page, - fields=fields_list, - sort_by=sort_by, - facets=facets_list, - index_id=index_id, - charts=charts_list, - ) - return app_search.search(search_parameters) - except ValidationError as e: - raise HTTPException(status_code=400, detail=str(e)) + result = app_search.search(search_parameters) + response.status_code = status_for_response(result) + return result @app.get("/autocomplete") @@ -191,11 +157,10 @@ def taxonomy_autocomplete( int | None, Query(description="Fuzziness level to use, default to no fuzziness."), ] = None, - index_id: Annotated[str | None, INDEX_ID_QUERY_PARAM] = None, + index_id: Annotated[str | None, CommonParametersQuery.index_id] = None, ): """API endpoint for autocompletion using taxonomies""" - check_config_is_defined() - global_config = cast(config.Config, config.CONFIG) + global_config = config.get_config() check_index_id_is_defined_or_400(index_id, global_config) index_id, index_config = global_config.get_index_config(index_id) taxonomy_names_list = taxonomy_names.split(",") @@ -239,7 +204,7 @@ def html_search( page_size: int = 24, langs: str = "fr,en", sort_by: str | None = None, - index_id: Annotated[str | None, INDEX_ID_QUERY_PARAM] = None, + index_id: Annotated[str | None, CommonParametersQuery.index_id] = None, # Display debug information in the HTML response display_debug: bool = False, ): diff --git a/app/charts.py b/app/charts.py index 03efff66..39b3f46b 100644 --- a/app/charts.py +++ b/app/charts.py @@ -4,8 +4,8 @@ from ._types import ( ChartsInfos, ChartType, - DistributionChartType, - ScatterChartType, + DistributionChart, + ScatterChart, SuccessSearchResponse, ) @@ -44,7 +44,7 @@ def empty_chart(chart_name): def build_distribution_chart( - chart: DistributionChartType, values, index_config: config.IndexConfig + chart: DistributionChart, values, index_config: config.IndexConfig ): """ Return the vega structure for a Bar Chart @@ -139,7 +139,7 @@ def build_distribution_chart( def build_scatter_chart( - chart_option: ScatterChartType, search_result, index_config: config.IndexConfig + chart_option: ScatterChart, search_result, index_config: config.IndexConfig ): """ Build a scatter plot only for values from search_results @@ -242,7 +242,7 @@ def build_charts( aggregations = search_result.aggregations for requested_chart in requested_charts: - if requested_chart.chart_type == "ScatterChartType": + if requested_chart.chart_type == "ScatterChart": charts[f"{requested_chart.x}:{requested_chart.y}"] = build_scatter_chart( requested_chart, search_result, index_config ) diff --git a/app/cli/main.py b/app/cli/main.py index c3f001ed..e100c504 100644 --- a/app/cli/main.py +++ b/app/cli/main.py @@ -24,16 +24,14 @@ def _get_index_config( config_path: Optional[Path], index_id: Optional[str] ) -> tuple[str, "app.config.IndexConfig"]: - from typing import cast from app import config - from app.config import check_config_is_defined, set_global_config + from app.config import set_global_config if config_path: set_global_config(config_path) - check_config_is_defined() - global_config = cast(config.Config, config.CONFIG) + global_config = config.get_config() index_id, index_config = global_config.get_index_config(index_id) if index_config is None: raise typer.BadParameter( @@ -102,7 +100,6 @@ def import_data( start_time = time.perf_counter() index_id, index_config = _get_index_config(config_path, index_id) - num_errors = run_items_import( input_path, num_processes, @@ -131,24 +128,53 @@ def import_taxonomies( default=None, help=INDEX_ID_HELP, ), + skip_indexing: bool = typer.Option( + default=False, + help="Skip putting taxonomies in the ES index", + ), + skip_synonyms: bool = typer.Option( + default=False, + help="Skip creating synonyms files for ES analyzers", + ), ): """Import taxonomies into Elasticsearch. - It get taxonomies json files as specified in the configuration file. + It download taxonomies json files as specified in the configuration file. + + It creates taxonomies indexes (for auto-completion). + + It creates synonyms files for ElasticSearch analyzers + (enabling full text search to benefits from synonyms). """ import time - from app._import import perform_taxonomy_import - from app.utils import get_logger + from app._import import perform_refresh_synonyms, perform_taxonomy_import + from app.utils import connection, get_logger logger = get_logger() index_id, index_config = _get_index_config(config_path, index_id) - start_time = time.perf_counter() - perform_taxonomy_import(index_config) - end_time = time.perf_counter() - logger.info("Import time: %s seconds", end_time - start_time) + # open a connection for this process + connection.get_es_client(request_timeout=120, retry_on_timeout=True) + + if skip_indexing: + logger.info("Skipping indexing of taxonomies") + else: + start_time = time.perf_counter() + perform_taxonomy_import(index_config) + end_time = time.perf_counter() + logger.info("Import time: %s seconds", end_time - start_time) + if skip_synonyms: + logger.info("Skipping synonyms generation") + else: + start_time = time.perf_counter() + perform_refresh_synonyms( + index_id, + index_config, + ) + end_time = time.perf_counter() + logger.info("Synonyms generation time: %s seconds", end_time - start_time) @cli.command() @@ -181,6 +207,50 @@ def sync_scripts( ) +@cli.command() +def cleanup_indexes( + config_path: Optional[Path] = typer.Option( + default=None, + help="path of the yaml configuration file, it overrides CONFIG_PATH envvar", + dir_okay=False, + file_okay=True, + exists=True, + ), + index_id: Optional[str] = typer.Option( + default=None, + help=f"{INDEX_ID_HELP}\nIf not specified, all indexes are cleaned", + ), +): + """Clean old indexes that are not active anymore (no aliases) + + As you do full import of data or update taxonomies, + old indexes are not removed automatically. + (in the case you want to roll back or compare). + + This command will remove all indexes that are not active anymore. + """ + import time + + from app._import import perform_cleanup_indexes + from app.utils import get_logger + + logger = get_logger() + if index_id: + _, index_config = _get_index_config(config_path, index_id) + index_configs = [index_config] + else: + _get_index_config(config_path, None) # just to set global config variable + from app.config import get_config + + index_configs = list(get_config().indices.values()) + start_time = time.perf_counter() + removed = 0 + for index_config in index_configs: + removed += perform_cleanup_indexes(index_config) + end_time = time.perf_counter() + logger.info("Removed %d indexes in %s seconds", removed, end_time - start_time) + + @cli.command() def run_update_daemon( config_path: Optional[Path] = typer.Option( @@ -199,11 +269,10 @@ def run_update_daemon( It is optional but enables having an always up-to-date index, for applications where data changes. """ - from typing import cast from app import config from app._import import run_update_daemon - from app.config import check_config_is_defined, set_global_config, settings + from app.config import set_global_config, settings from app.utils import get_logger, init_sentry # Create root logger @@ -214,8 +283,7 @@ def run_update_daemon( if config_path: set_global_config(config_path) - check_config_is_defined() - global_config = cast(config.Config, config.CONFIG) + global_config = config.get_config() run_update_daemon(global_config) diff --git a/app/config.py b/app/config.py index 2be9d545..f820099b 100644 --- a/app/config.py +++ b/app/config.py @@ -1,3 +1,4 @@ +import functools import logging from enum import StrEnum, auto from inspect import cleandoc as cd_ @@ -5,7 +6,14 @@ from typing import Annotated, Any import yaml -from pydantic import BaseModel, Field, HttpUrl, field_validator, model_validator +from pydantic import ( + BaseModel, + Field, + FileUrl, + HttpUrl, + field_validator, + model_validator, +) from pydantic.json_schema import GenerateJsonSchema from pydantic_settings import BaseSettings @@ -121,6 +129,12 @@ class Settings(BaseSettings): description="User-Agent used when fetching resources (taxonomies) or documents" ), ] = "search-a-licious" + synonyms_path: Annotated[ + Path, + Field( + description="Path of the directory that will contain synonyms for ElasticSearch instances" + ), + ] = Path("/opt/search/synonyms") settings = Settings() @@ -199,7 +213,7 @@ class TaxonomySourceConfig(BaseModel): ), ] url: Annotated[ - HttpUrl, + FileUrl | HttpUrl, Field( description=cd_( """URL of the taxonomy. @@ -238,6 +252,7 @@ class FieldType(StrEnum): Tokenization will use analyzers specific to each languages. * taxonomy: a field akin to keyword but with support for matching using taxonomy synonyms and translations + (and in fact also a text mapping possibility) * disabled: a field that is not stored nor searchable (see [Elasticsearch help]) * object: this field contains a dict with sub-fields. @@ -340,7 +355,7 @@ class FieldConfig(BaseModel): It is used to return a 'faceted-view' with the number of results for each facet value, or to generate bar charts. - Only valid for keyword or numeric field types. + Only valid for keyword, taxonomy or numeric field types. """ ) ), @@ -356,23 +371,14 @@ class FieldConfig(BaseModel): ) ), ] = None - add_taxonomy_synonyms: Annotated[ - bool, - Field( - description=cd_( - """if True, add all synonyms of the taxonomy values to the index. - The flag is ignored if the field type is not `taxonomy`. - """ - ) - ), - ] = True @model_validator(mode="after") def bucket_agg_should_be_used_for_keyword_and_numeric_types_only(self): """Validator that checks that `bucket_agg` is only provided for fields with types `keyword`, `double`, `float`, `integer` or `bool`.""" if self.bucket_agg and not ( - self.type.is_numeric() or self.type in (FieldType.keyword, FieldType.bool) + self.type.is_numeric() + or self.type in (FieldType.keyword, FieldType.bool, FieldType.taxonomy) ): raise ValueError( "bucket_agg should be provided for taxonomy or numeric type only" @@ -484,35 +490,22 @@ class TaxonomyConfig(BaseModel): """Configuration of taxonomies, that is collections of entries with synonyms in multiple languages. + See [Explain taxonomies](../explain-taxonomies) + Field may be linked to taxonomies. It enables enriching search with synonyms, as well as providing suggestions, or informative facets. + + Note: if you define taxonomies, you must import them using + [import-taxonomies command](../ref-python/cli.html#python3-m-app-import-taxonomies) """ sources: Annotated[ list[TaxonomySourceConfig], Field(description="Configurations of taxonomies that this project will use."), ] - exported_langs: Annotated[ - list[str], - Field( - description=cd_( - """a list of languages for which - we want taxonomized fields to be always exported during indexing. - - During indexing, we use the taxonomy to translate every taxonomized field - in a language-specific subfield. - - The list of language depends on the value defined here and on the optional - `taxonomy_langs` field that can be defined in each document. - - Beware that providing many language might inflate the index size. - """, - ) - ), - ] index: Annotated[ TaxonomyIndexConfig, Field(description=TaxonomyIndexConfig.__doc__), @@ -695,13 +688,38 @@ class IndexConfig(BaseModel): float, Field( description=cd_( - """How much we boost exact matches on individual fields + """How much we boost exact matches on consecutive words - This only makes sense when using "best match" order. + That is, if you search "Dark Chocolate", + it will boost entries that have the "Dark Chocolate" phrase (in the same field). + + It only applies to free text search. + + This only makes sense when using + "boost_phrase" request parameters and "best match" order. + + Note: this field accept float of string, + because using float might generate rounding problems. + The string must represent a float. """ ) ), ] = 2.0 + match_phrase_boost_proximity: Annotated[ + int | None, + Field( + description=cd_( + """How much we allow proximity for `match_phrase_boost`. + + If unspecified we will just match word to word. + Otherwise it will allow some gap between words matching + + This only makes sense when using + "boost_phrase" request parameters and "best match" order. + """ + ) + ), + ] = None document_denylist: Annotated[ set[str], Field( @@ -764,6 +782,15 @@ def field_references_must_exist_and_be_valid(self): return self + @field_validator("fields") + @classmethod + def ensure_no_fields_use_reserved_name(cls, fields: dict[str, FieldConfig]): + """Verify that no field name clashes with a reserved name""" + used_reserved = set(["last_indexed_datetime", "_id"]) & set(fields.keys()) + if used_reserved: + raise ValueError(f"The field names {','.join(used_reserved)} are reserved") + return fields + @field_validator("fields") @classmethod def add_field_name_to_each_field(cls, fields: dict[str, FieldConfig]): @@ -772,31 +799,42 @@ def add_field_name_to_each_field(cls, fields: dict[str, FieldConfig]): field_item.name = field_name return fields - def get_supported_langs(self) -> set[str]: - """Return the set of supported languages for `text_lang` fields. - - It's used to know which language-specific subfields to create. - """ - return ( - set(self.supported_langs or []) - # only keep langs for which a built-in analyzer built-in, other - # langs will be stored in a unique `other` subfield - ) & set(ANALYZER_LANG_MAPPING) - - def get_taxonomy_langs(self) -> set[str]: - """Return the set of exported languages for `taxonomy` fields. - - It's used to know which language-specific subfields to create. - """ - # only keep langs for which a built-in analyzer built-in, other - # langs will be stored in a unique `other` subfield - return (set(self.taxonomy.exported_langs)) & set(ANALYZER_LANG_MAPPING) - def get_fields_with_bucket_agg(self): return [ field_name for field_name, field in self.fields.items() if field.bucket_agg ] + @functools.cached_property + def text_lang_fields(self) -> dict[str, FieldConfig]: + """List all text_lang fields in an efficient way""" + return { + field_name: field + for field_name, field in self.fields.items() + if field.type == FieldType.text_lang + } + + @functools.cached_property + def supported_langs_set(self): + return frozenset(self.supported_langs) + + @functools.cached_property + def lang_fields(self) -> dict[str, FieldConfig]: + """Fully qualified name of fields that are translated""" + return { + fname: field + for fname, field in self.fields.items() + if field.type in ["taxonomy", "text_lang"] + } + + @functools.cached_property + def full_text_fields(self) -> dict[str, FieldConfig]: + """Fully qualified name of fields that are part of default full text search""" + return { + fname: field + for fname, field in self.fields.items() + if field.full_text_search + } + CONFIG_DESCRIPTION_INDICES = """ A Search-a-licious instance only have one configuration file, @@ -870,25 +908,33 @@ def from_yaml(cls, path: Path) -> "Config": return cls(**data) -# CONFIG is a global variable that contains the search-a-licious configuration +# _CONFIG is a global variable that contains the search-a-licious configuration # used. It is specified by the envvar CONFIG_PATH. -CONFIG: Config | None = None -if settings.config_path: - if not settings.config_path.is_file(): - raise RuntimeError(f"config file does not exist: {settings.config_path}") +# use get_config() to access it. +_CONFIG: Config | None = None - CONFIG = Config.from_yaml(settings.config_path) +def get_config() -> Config: + """Return the object containing global configuration -def check_config_is_defined(): - """Raise a RuntimeError if the Config path is not set.""" - if CONFIG is None: + It raises if configuration was not yet set + """ + if _CONFIG is None: raise RuntimeError( "No configuration is configured, set envvar " "CONFIG_PATH with the path of the yaml configuration file" ) + return _CONFIG def set_global_config(config_path: Path): - global CONFIG - CONFIG = Config.from_yaml(config_path) + global _CONFIG + _CONFIG = Config.from_yaml(config_path) + return _CONFIG + + +if settings.config_path: + if not settings.config_path.is_file(): + raise RuntimeError(f"config file does not exist: {settings.config_path}") + + set_global_config(settings.config_path) diff --git a/app/es_query_builder.py b/app/es_query_builder.py new file mode 100644 index 00000000..8ce9a071 --- /dev/null +++ b/app/es_query_builder.py @@ -0,0 +1,187 @@ +"""This module creates a specific ESQueryBuilder, +that will be able to handle the full text search correctly +""" + +import luqum +from luqum.elasticsearch.tree import EPhrase, EWord +from luqum.elasticsearch.visitor import ElasticsearchQueryBuilder + +from ._types import JSONType +from .config import IndexConfig +from .exceptions import FreeWildCardError, QueryAnalysisError + +DEFAULT_FIELD_MARKER = "_searchalicious_text" + + +class FullTextMixin: + """Implementation of json query transformation to use a query + on all needed field according to full_text_search configurations + """ + + # attributes provided in implementations + index_config: IndexConfig + query_langs: list[str] + + MATCH_TO_MULTI_MATCH_TYPE = { + "match": "best_fields", + "match_phrase": "phrase", + } + + @property + def full_text_query_fields(self) -> list[str]: + """List fields to match upon in full text search""" + fields = [] + lang_fields = set(self.index_config.lang_fields) + supported_langs = set(self.index_config.supported_langs) & set(self.query_langs) + + for fname, field in self.index_config.full_text_fields.items(): + if fname in lang_fields: + for lang in supported_langs: + subfield_name = f"{field.name}.{lang}" + fields.append(subfield_name) + else: + fields.append(field.name) + return fields + + def _transform_query(self, query): + """Transform the query generated by luqum transformer + to a query on all necessary fields. + """ + fields = self.full_text_query_fields + if "exists" in query: + raise FreeWildCardError( + "Free wildcards are not allowed in full text queries" + ) + if "query_string" in query: + # no need to transform, just add fields + query["query_string"]["fields"] = fields + elif "match" in query or "match_phrase" in query: + query_type = list(k for k in query.keys() if k.startswith("match"))[0] + # go for multi_match + inner_json = query["multi_match"] = query.pop(query_type) + inner_json.update(inner_json.pop(self.field)) + inner_json["fields"] = fields + inner_json["type"] = self.MATCH_TO_MULTI_MATCH_TYPE[query_type] + else: + raise QueryAnalysisError( + f"Unexpected query type while analyzing full text query: {query.keys()}" + ) + return query + + +class EFullTextWord(EWord, FullTextMixin): + """Item that may generates a multi_match for word on default field""" + + def __init__(self, index_config: IndexConfig, query_langs=list[str], **kwargs): + super().__init__(**kwargs) + self.index_config = index_config + self.query_langs = query_langs + + @property + def json(self): + """Generate the JSON specific to our requests""" + # let's use the normal way to generate the json + query = super().json + # but modify request if we are on default field + if self.field == DEFAULT_FIELD_MARKER: + query = self._transform_query(query) + return query + + +class EFullTextPhrase(EPhrase, FullTextMixin): + """Item that generates a multi_match for phrase on default field""" + + def __init__(self, index_config: IndexConfig, query_langs=list[str], **kwargs): + super().__init__(**kwargs) + self.index_config = index_config + self.query_langs = query_langs + + @property + def json(self): + """Generate the JSON specific to our requests""" + # let's use the normal way to generate the json + query = super().json + # but modify request depending on the request type + if self.field == DEFAULT_FIELD_MARKER: + query = self._transform_query(query) + return query + + +class FullTextQueryBuilder(ElasticsearchQueryBuilder): + """We have our own ESQueryBuilder, + just to be able to use our FullTextItemFactory, + instead of the default ElasticSearchItemFactory + """ + + def __init__(self, **kwargs): + # sanity check, before overriding below + if "default_field" in kwargs: + raise NotImplementedError("You should not override default_field") + super().__init__( + # we put a specific marker on default_field + # because we want to be sure we recognize them + default_field=DEFAULT_FIELD_MARKER, + **kwargs, + ) + + # fix until https://github.com/jurismarches/luqum/issues/106 is resolved + def _get_operator_extract(self, binary_operation, delta=8): + try: + return super()._get_operator_extract(binary_operation, delta) + except IndexError: + return str(binary_operation) + + def visit_word(self, node, context): + """Specialize the query corresponding to word, + in the case of full text search + """ + fields = self._fields(context) + if fields == [DEFAULT_FIELD_MARKER]: + # we are in a full text query + # it's analyzed, don't bother with term + method = "match_phrase" if self.match_word_as_phrase else "match" + yield self.es_item_factory.build( + EFullTextWord, + q=node.value, + method=method, + # we keep fields, we will deal with it in EFullTextWord + fields=fields, + _name=self.get_name(node, context), + index_config=context["index_config"], + query_langs=context["query_langs"], + ) + else: + yield from super().visit_word(node, context) + + def visit_phrase(self, node, context): + """Specialize the query corresponding to phrase, + in the case of full text search + """ + fields = self._fields(context) + if fields == [DEFAULT_FIELD_MARKER]: + # we are in a full text query + # we know it's analyzed, don't bother with term + yield self.es_item_factory.build( + EFullTextPhrase, + phrase=node.value, + fields=self._fields(context), + _name=self.get_name(node, context), + index_config=context["index_config"], + query_langs=context["query_langs"], + ) + else: + yield from super().visit_phrase(node, context) + + def __call__( + self, tree: luqum.tree.Item, index_config: IndexConfig, query_langs: list[str] + ) -> JSONType: + """We add two parameters: + + :param index_config: the index config we are working on + :param query_langs: the target languages of current query + """ + self.nesting_checker(tree) + # we add our parameters to the context + context = {"index_config": index_config, "query_langs": query_langs} + elastic_tree = self.visit(tree, context) + return elastic_tree[0].json diff --git a/app/exceptions.py b/app/exceptions.py new file mode 100644 index 00000000..b338a271 --- /dev/null +++ b/app/exceptions.py @@ -0,0 +1,35 @@ +class QueryAnalysisError(Exception): + """Exception while building a query.""" + + +class InvalidLuceneQueryError(QueryAnalysisError): + """Invalid query, can't be analyzed by luqum""" + + +class FreeWildCardError(QueryAnalysisError): + """You can't use '*' alone without specifying a search field""" + + +class UnknownFieldError(QueryAnalysisError): + """An unknown field name was used in the query""" + + +class UnknownScriptError(QueryAnalysisError): + """An unknown script name was used in the query""" + + +class QueryCheckError(QueryAnalysisError): + """Encountered errors while checking Query""" + + def __init__(self, *args, errors: list[str]): + super().__init__(*args) + self.errors = errors + + def __str__(self): + errors = "\n - " + "\n - ".join(self.errors) + return f"{', '.join(self.args)}: {errors}" + + def __repr__(self): + return ( + f"{self.__class__.__name__}({', '.join(self.args)}, errors={self.errors})" + ) diff --git a/app/facets.py b/app/facets.py index 4b633a62..d7df76ad 100644 --- a/app/facets.py +++ b/app/facets.py @@ -17,6 +17,13 @@ def _get_translations( lang: str, items: list[tuple[str, str]], index_config: config.IndexConfig ) -> dict[tuple[str, str], str]: + """Get translations for a list of items + + :param lang: target language + :param items: list of (entry id, field_name) + :param index_config: the index configuration + :return: a dict mapping (id, field_name) to the translation + """ # go from field_name to taxonomy field_names = set([field_name for _, field_name in items]) field_taxonomy: dict[str, str] = { @@ -25,7 +32,7 @@ def _get_translations( for field_name in field_names if index_config.fields[field_name].taxonomy_name } - # fetch items names + # fetch items names within a single query items_to_fetch = [ (id, field_taxonomy[field_name]) for id, field_name in items @@ -35,24 +42,24 @@ def _get_translations( # compute best translations translations: dict[tuple[str, str], str] = {} for id, field_name in items: - item_translations = None + item_translation = None names = ( items_names.get((id, field_taxonomy[field_name])) if field_name in field_taxonomy else None ) if names: - item_translations = names.get(lang, None) + item_translation = names.get(lang, None) # fold back to main language for item - if not item_translations: + if not item_translation: main_lang = id.split(":", 1)[0] - item_translations = names.get(main_lang, None) + item_translation = names.get(main_lang, None) # fold back to english - if not translations: - item_translations = names.get("en", None) + if not item_translation: + item_translation = names.get("en", None) # eventually translate - if item_translations: - translations[(id, field_name)] = item_translations[0] + if item_translation: + translations[(id, field_name)] = item_translation return translations diff --git a/app/indexing.py b/app/indexing.py index 2c48e32d..9d09f776 100644 --- a/app/indexing.py +++ b/app/indexing.py @@ -14,11 +14,14 @@ FieldType, IndexConfig, TaxonomyConfig, - TaxonomySourceConfig, ) -from app.taxonomy import get_taxonomy from app.utils import load_class_object_from_string -from app.utils.analyzers import AUTOCOMPLETE_ANALYZERS +from app.utils.analyzers import ( + get_autocomplete_analyzer, + get_taxonomy_indexing_analyzer, + get_taxonomy_search_analyzer, + number_of_fields, +) FIELD_TYPE_TO_DSL_TYPE = { FieldType.keyword: dsl_field.Keyword, @@ -37,46 +40,46 @@ def generate_dsl_field( - field: FieldConfig, supported_langs: Iterable[str], taxonomy_langs: Iterable[str] + field: FieldConfig, supported_langs: Iterable[str] ) -> dsl_field.Field: """Generate Elasticsearch DSL field from a FieldConfig. + This will be used to generate the Elasticsearch mapping. + + This is an important part, because it will define the behavior of each field. + :param field: the field to use as input :param supported_langs: an iterable of languages (2-letter codes), - used to know which sub-fields to create for `text_lang` field types - :param taxonomy_langs: an iterabl of languages (2-letter codes), - used to know which sub-fields to create for `taxonomy` field types + used to know which sub-fields to create for `text_lang` + and `taxonomy` field types :return: the elasticsearch_dsl field """ if field.type is FieldType.taxonomy: - # in `other`, we store the text of all languages that don't have a - # built-in ES analyzer. By using a single field, we don't create as - # many subfields as there are supported languages - properties = {"other": dsl_field.Text(analyzer=analyzer("standard"))} - for lang in taxonomy_langs: - if lang in ANALYZER_LANG_MAPPING: - properties[lang] = dsl_field.Text( - analyzer=analyzer(ANALYZER_LANG_MAPPING[lang]) - ) - return dsl_field.Object( - required=field.required, dynamic=False, properties=properties - ) - + # We will store the taxonomy identifier as keyword + # And also store it in subfields with query analyzers for each language, + # that will activate synonyms and specific normalizations + if field.taxonomy_name is None: + raise ValueError("Taxonomy field must have a taxonomy_name set in config") + sub_fields = { + lang: dsl_field.Text( + # we almost use keyword analyzer as we really map synonyms to a keyword + analyzer=get_taxonomy_indexing_analyzer(field.taxonomy_name, lang), + # but on query we need to fold and match with synonyms + search_analyzer=get_taxonomy_search_analyzer( + field.taxonomy_name, lang, with_synonyms=True + ), + ) + for lang in supported_langs + } + return dsl_field.Keyword(required=field.required, fields=sub_fields) elif field.type is FieldType.text_lang: properties = { - # we use `other` field for the same reason as for the `taxonomy` - # type - "other": dsl_field.Text(analyzer=analyzer("standard")), - # Add subfield used to save main language version for `text_lang` - "main": dsl_field.Text(analyzer=analyzer("standard")), + lang: dsl_field.Text( + analyzer=analyzer(ANALYZER_LANG_MAPPING.get(lang, "standard")), + ) + for lang in supported_langs } - for lang in supported_langs: - if lang in ANALYZER_LANG_MAPPING: - properties[lang] = dsl_field.Text( - analyzer=analyzer(ANALYZER_LANG_MAPPING[lang]) - ) return dsl_field.Object(dynamic=False, properties=properties) - elif field.type == FieldType.object: return dsl_field.Object(dynamic=True) elif field.type == FieldType.disabled: @@ -170,12 +173,7 @@ def process_text_lang_field( else: # here key is the lang 2-letters code key = target_field.rsplit(lang_separator, maxsplit=1)[-1] - # Here we check whether the language is supported, otherwise - # we use the default "other" field, that aggregates texts - # from all unsupported languages - # it's the only subfield that is a list instead of a string if key not in supported_langs: - field_input.setdefault("other", []).append(input_value) continue field_input[key] = input_value @@ -188,82 +186,25 @@ def process_taxonomy_field( field: FieldConfig, taxonomy_config: TaxonomyConfig, split_separator: str, - taxonomy_langs: set[str], ) -> JSONType | None: """Process data for a `taxonomy` field type. - Generates a dict ready to be indexed by Elasticsearch, with a subfield for - each language. Two other subfields are added: - - - `original`: the original value of the field. For example, if the field - name is `categories` and `categories` already exist in the document, - we will save its value in the `original` subfield. This subfield is - only added if the field is present in the input data. - - - `other`: the value of the field for languages that are not supported by - the project (no elasticsearch specific analyzers) + There is not much to be done here, + as the magic of synonyms etc. happens by ES itself, + thanks to our mapping definition, + and a bit at query time. :param data: input data, as a dict :param field: the field config - :param taxonomy_config: the taxonomy config :param split_separator: the separator used to split the input field value, in case of multi-valued input (if `field.split` is True) - :param taxonomy_langs: a set of supported languages (2-letter codes), used - to know which sub-fields to create. - :return: the processed data, as a dict + :return: the processed value """ - field_input: JSONType = {} input_field = field.get_input_field() input_value = preprocess_field_value( data, input_field, split=field.split, split_separator=split_separator ) - if input_value is None: - return None - - taxonomy_sources_by_name = { - source.name: source for source in taxonomy_config.sources - } - taxonomy_source_config: TaxonomySourceConfig = taxonomy_sources_by_name[ - field.taxonomy_name # type: ignore - ] - taxonomy = get_taxonomy( - taxonomy_source_config.name, str(taxonomy_source_config.url) - ) - - # to know in which language we should translate the tags using the - # taxonomy, we use: - # - the language list defined in the taxonomy config: for every item, we - # translate the tags for this list of languages - # - a custom list of supported languages for the item (`taxonomy_langs` - # field), this is used to allow indexing tags for an item that is available - # in specific countries - langs = taxonomy_langs | set(data.get("taxonomy_langs", [])) - for lang in langs: - for single_tag in input_value: - if single_tag not in taxonomy: - continue - - node = taxonomy[single_tag] - values = {node.get_localized_name(lang)} - - if field.add_taxonomy_synonyms: - values |= set(node.get_synonyms(lang)) - - # Add international version of the name - if "xx" in node.names: - values |= set(node.get_synonyms("xx")) - - for value in values: - if value is not None: - # If language is not supported (=no elasticsearch specific - # analyzers), we store the data in a "other" field - key = lang if lang in ANALYZER_LANG_MAPPING else "other" - field_input.setdefault(key, []).append(value) - - if field.name in data: - field_input["original"] = data[field.name] - - return field_input if field_input else None + return input_value if input_value else None class DocumentProcessor: @@ -273,8 +214,7 @@ class DocumentProcessor: def __init__(self, config: IndexConfig) -> None: self.config = config - self.supported_langs = config.get_supported_langs() - self.taxonomy_langs = config.get_taxonomy_langs() + self.supported_langs_set = config.supported_langs_set self.preprocessor: BaseDocumentPreprocessor | None if config.preprocessor is not None: @@ -283,6 +223,47 @@ def __init__(self, config: IndexConfig) -> None: else: self.preprocessor = None + def inputs_from_data(self, id_, processed_data: JSONType) -> JSONType: + """Generate a dict with the data to be indexed in ES""" + inputs = { + "last_indexed_datetime": datetime.datetime.utcnow().isoformat(), + "_id": id_, + } + for field in self.config.fields.values(): + input_field = field.get_input_field() + + if field.type == FieldType.text_lang: + # dispath languages in a sub-dictionary + field_input = process_text_lang_field( + processed_data, + input_field=field.get_input_field(), + split=field.split, + lang_separator=self.config.lang_separator, + split_separator=self.config.split_separator, + supported_langs=self.supported_langs_set, + ) + # nothing to do, all the magic of subfield is done thanks to ES + elif field.type == FieldType.taxonomy: + field_input = process_taxonomy_field( + data=processed_data, + field=field, + taxonomy_config=self.config.taxonomy, + split_separator=self.config.split_separator, + ) + + else: + field_input = preprocess_field_value( + processed_data, + input_field, + split=field.split, + split_separator=self.config.split_separator, + ) + + if field_input: + inputs[field.name] = field_input + + return inputs + def from_result(self, result: FetcherResult) -> FetcherResult: """Generate an item ready to be indexed by elasticsearch-dsl from a fetcher result. @@ -325,87 +306,72 @@ def from_result(self, result: FetcherResult) -> FetcherResult: processed_data = processed_result.document - inputs = { - "last_indexed_datetime": datetime.datetime.utcnow().isoformat(), - "_id": _id, - } - for field in self.config.fields.values(): - input_field = field.get_input_field() - - if field.type == FieldType.text_lang: - field_input = process_text_lang_field( - processed_data, - input_field=field.get_input_field(), - split=field.split, - lang_separator=self.config.lang_separator, - split_separator=self.config.split_separator, - supported_langs=self.supported_langs, - ) - - elif field.type == FieldType.taxonomy: - field_input = process_taxonomy_field( - data=processed_data, - field=field, - taxonomy_config=self.config.taxonomy, - split_separator=self.config.split_separator, - taxonomy_langs=self.taxonomy_langs, - ) - - else: - field_input = preprocess_field_value( - processed_data, - input_field, - split=field.split, - split_separator=self.config.split_separator, - ) - - if field_input: - inputs[field.name] = field_input + inputs = self.inputs_from_data(_id, processed_data) return FetcherResult(status=processed_result.status, document=inputs) def generate_mapping_object(config: IndexConfig) -> Mapping: + """ES Mapping for project index, that will contain the data""" mapping = Mapping() supported_langs = config.supported_langs - taxonomy_langs = config.taxonomy.exported_langs + # note: when we reference new analyzers in the mapping as analyzers objects, + # Elasticsearch DSL will reference them in the analyzer section by itself for field in config.fields.values(): mapping.field( field.name, - generate_dsl_field( - field, supported_langs=supported_langs, taxonomy_langs=taxonomy_langs - ), + generate_dsl_field(field, supported_langs=supported_langs), ) # date of last index for the purposes of search + # this is a field internal to Search-a-licious and independent of the project mapping.field("last_indexed_datetime", dsl_field.Date(required=True)) return mapping def generate_index_object(index_name: str, config: IndexConfig) -> Index: + """Index configuration for project index, that will contain the data""" index = Index(index_name) - index.settings( - number_of_shards=config.index.number_of_shards, - number_of_replicas=config.index.number_of_replicas, - ) + settings = { + "number_of_shards": config.index.number_of_shards, + "number_of_replicas": config.index.number_of_replicas, + } mapping = generate_mapping_object(config) + num_fields = number_of_fields(mapping) + # add 25% margin + num_fields = int(num_fields * 1.25) + if num_fields > 1000: + # default limit is 1000 fields, set a specific one + settings["index.mapping.total_fields.limit"] = num_fields + index.settings(**settings) index.mapping(mapping) return index def generate_taxonomy_mapping_object(config: IndexConfig) -> Mapping: + """ES Mapping for indexes containing taxonomies entries""" mapping = Mapping() supported_langs = config.supported_langs mapping.field("id", dsl_field.Keyword(required=True)) mapping.field("taxonomy_name", dsl_field.Keyword(required=True)) mapping.field( - "names", + "name", + dsl_field.Object( + required=True, + dynamic=False, + properties={ + lang: dsl_field.Keyword(required=False) for lang in supported_langs + }, + ), + ), + mapping.field( + "synonyms", dsl_field.Object( required=True, dynamic=False, properties={ lang: dsl_field.Completion( - analyzer=AUTOCOMPLETE_ANALYZERS.get(lang, "simple"), + analyzer=get_autocomplete_analyzer(lang), contexts=[ { "name": "taxonomy_name", @@ -422,6 +388,9 @@ def generate_taxonomy_mapping_object(config: IndexConfig) -> Mapping: def generate_taxonomy_index_object(index_name: str, config: IndexConfig) -> Index: + """ + Index configuration for indexes containing taxonomies entries + """ index = Index(index_name) taxonomy_index_config = config.taxonomy.index index.settings( diff --git a/app/openfoodfacts.py b/app/openfoodfacts.py index a319aff8..1a4adc6e 100644 --- a/app/openfoodfacts.py +++ b/app/openfoodfacts.py @@ -9,7 +9,6 @@ from app._types import FetcherResult, FetcherStatus, JSONType from app.indexing import BaseDocumentPreprocessor from app.postprocessing import BaseResultProcessor -from app.taxonomy import get_taxonomy from app.utils.download import http_session from app.utils.log import get_logger @@ -121,42 +120,29 @@ def fetch_document(self, stream_name: str, item: JSONType) -> FetcherResult: class DocumentPreprocessor(BaseDocumentPreprocessor): + def preprocess(self, document: JSONType) -> FetcherResult: # no need to have a deep-copy here document = copy.copy(document) # convert obsolete field into bool document["obsolete"] = bool(document.get("obsolete")) - document["taxonomy_langs"] = self.get_taxonomy_langs(document) + # add "main" language to text_lang fields + self.add_main_language(document) # Don't keep all nutriment values self.select_nutriments(document) return FetcherResult(status=FetcherStatus.FOUND, document=document) - def get_taxonomy_langs(self, document: JSONType) -> list[str]: - # We add `taxonomy_langs` field to index taxonomized fields in - # the language of the product. To determine the list of - # `taxonomy_langs`, we check: - # - `languages_code` - # - `countries_tags`: we add every official language of the countries - # where the product can be found. - taxonomy_langs = set(document.get("languages_codes", [])) - countries_tags = document.get("countries_tags", []) - country_taxonomy = get_taxonomy("country", COUNTRIES_TAXONOMY_URL) - - for country_tag in countries_tags: - # Check that `country_tag` is in taxonomy - if (country_node := country_taxonomy[country_tag]) is not None: - # Get all official languages of the country, and add them to - # `taxonomy_langs` - if ( - lang_codes := country_node.properties.get("language_codes", {}).get( - "en" - ) - ) is not None: - taxonomy_langs |= set( - lang_code for lang_code in lang_codes.split(",") if lang_code - ) + def add_main_language(self, document: JSONType) -> None: + """We add a "main" language to translated fields (text_lang and taxonomies) - return list(taxonomy_langs) + This enables searching in the main language of the product. + This is important because most of the time, + products have no entry for a lot of language, + so this is an interesting fall-back. + """ + for field in self.config.text_lang_fields: + if field in document: + document[field + "_main"] = document[field] def select_nutriments(self, document: JSONType): """Only selected interesting nutriments, as there are hundreds of @@ -191,6 +177,9 @@ def process_after(self, result: JSONType) -> JSONType: @staticmethod def build_image_fields(product: JSONType): + """Images are stored in a weird way in Open Food Facts, + We want to make it far more simple to use in results. + """ # Python copy of the code from # https://github.com/openfoodfacts/openfoodfacts-server/blob/b297ed858d526332649562cdec5f1d36be184984/lib/ProductOpener/Display.pm#L10128 code = product["code"] diff --git a/app/postprocessing.py b/app/postprocessing.py index 890f1fdb..40d0f87a 100644 --- a/app/postprocessing.py +++ b/app/postprocessing.py @@ -1,7 +1,7 @@ from elasticsearch_dsl.response import Response from app._types import JSONType -from app.config import FieldType, IndexConfig +from app.config import IndexConfig from app.utils import load_class_object_from_string @@ -10,6 +10,9 @@ def __init__(self, config: IndexConfig) -> None: self.config = config def process(self, response: Response, projection: set[str] | None) -> JSONType: + """Post process results to add some information, + or transform results to flatten them + """ output = { "took": response.took, "timed_out": response.timed_out, @@ -21,17 +24,16 @@ def process(self, response: Response, projection: set[str] | None) -> JSONType: result = hit.to_dict() result["_score"] = hit.meta.score - for field in self.config.fields.values(): - if field.name not in result: + # TODO make it an unsplit option or move to specific off post processing + for fname in self.config.text_lang_fields: + if fname not in result: continue - - if field.type is FieldType.text_lang: - lang_values = result.pop(field.name) - for lang, text in lang_values.items(): - suffix = "" if lang == "main" else f"_{lang}" - result[f"{field.name}{suffix}"] = text - elif field.type is FieldType.taxonomy: - result[field.name] = result.pop(field.name)["original"] + # Flatten the language dict + lang_values = result.pop(fname) + for lang, text in lang_values.items(): + # FIXME: this reproduces OFF behaviour, but is this a good thing? + suffix = "" if lang == "main" else f"_{lang}" + result[f"{fname}{suffix}"] = text result = self.process_after(result) if projection: diff --git a/app/query.py b/app/query.py index 66fa7968..b5299902 100644 --- a/app/query.py +++ b/app/query.py @@ -1,13 +1,13 @@ import elastic_transport import elasticsearch import luqum.exceptions -from elasticsearch_dsl import A, Q, Search +from elasticsearch_dsl import A, Search from elasticsearch_dsl.aggs import Agg -from elasticsearch_dsl.query import Query from luqum import tree -from luqum.elasticsearch import ElasticsearchQueryBuilder from luqum.elasticsearch.schema import SchemaAnalyzer +from luqum.elasticsearch.visitor import ElasticsearchQueryBuilder from luqum.parser import parser +from luqum.utils import OpenRangeTransformer, UnknownOperationResolver from ._types import ( ErrorSearchResponse, @@ -20,9 +20,16 @@ SuccessSearchResponse, ) from .config import FieldType, IndexConfig +from .es_query_builder import FullTextQueryBuilder from .es_scripts import get_script_id +from .exceptions import InvalidLuceneQueryError, QueryCheckError, UnknownScriptError from .indexing import generate_index_object from .postprocessing import BaseResultProcessor +from .query_transformers import ( + LanguageSuffixTransformer, + PhraseBoostTransformer, + QueryCheck, +) from .utils import get_logger, str_utils logger = get_logger(__name__) @@ -35,146 +42,27 @@ def build_elasticsearch_query_builder(config: IndexConfig) -> ElasticsearchQuery options = SchemaAnalyzer(index.to_dict()).query_builder_options() # we default to a AND between terms that are just space separated options["default_operator"] = ElasticsearchQueryBuilder.MUST - return ElasticsearchQueryBuilder(**options) - - -def build_query_clause(query: str, langs: list[str], config: IndexConfig) -> Query: - fields = [] - supported_langs = config.get_supported_langs() - taxonomy_langs = config.get_taxonomy_langs() - match_phrase_boost_queries = [] - - for field in config.fields.values(): - # We don't include all fields in the multi-match clause, only a subset - # of them - if field.full_text_search: - if field.type in (FieldType.taxonomy, FieldType.text_lang): - # language subfields are not the same depending on whether the - # field is a `taxonomy` or a `text_lang` field - langs_subset = frozenset( - supported_langs - if field.type is FieldType.text_lang - else taxonomy_langs - ) - field_match_phrase_boost_queries = [] - for lang in (_lang for _lang in langs if _lang in langs_subset): - subfield_name = f"{field.name}.{lang}" - fields.append(subfield_name) - field_match_phrase_boost_queries.append( - Q( - "match_phrase", - **{ - subfield_name: { - "query": query, - "boost": config.match_phrase_boost, - } - }, - ) - ) - if len(field_match_phrase_boost_queries) == 1: - match_phrase_boost_queries.append( - field_match_phrase_boost_queries[0] - ) - elif len(field_match_phrase_boost_queries) > 1: - match_phrase_boost_queries.append( - Q("bool", should=field_match_phrase_boost_queries) - ) - - else: - fields.append(field.name) - match_phrase_boost_queries.append( - Q( - "match_phrase", - **{ - field.name: { - "query": query, - "boost": config.match_phrase_boost, - } - }, - ) - ) - - multi_match_query = Q("multi_match", query=query, fields=fields) - - if match_phrase_boost_queries: - multi_match_query |= Q("bool", should=match_phrase_boost_queries) - - return multi_match_query + # remove default_field + options.pop("default_field", None) + return FullTextQueryBuilder(**options) def parse_query(q: str | None) -> QueryAnalysis: """Begin query analysis by parsing the query.""" analysis = QueryAnalysis(text_query=q) - if q is None: + if q is None or not q.strip(): return analysis try: analysis.luqum_tree = parser.parse(q) + # FIXME: resolve UnknownFilter (to AND) except ( luqum.exceptions.ParseError, luqum.exceptions.InconsistentQueryException, ) as e: - # if the lucene syntax is invalid, consider the query as plain text - logger.warning("parsing error for query: '%s':\n%s", q, e) - analysis.luqum_tree = None + raise InvalidLuceneQueryError("Request could not be analyzed by luqum") from e return analysis -def decompose_query( - q: QueryAnalysis, filter_query_builder: ElasticsearchQueryBuilder -) -> QueryAnalysis: - """Decompose the query into two parts: - - - a Lucene DSL query, which is used as a filter clause in the - Elasticsearch query. Luqum library is used to transform the - Lucene DSL into Elasticsearch DSL. - - remaining terms, used for full text search. - - :param q: the user query - :param filter_query_builder: Luqum query builder - :return: a tuple containing the Elasticsearch filter clause and - the remaining terms for full text search - """ - if q.text_query is None: - return q - remaining_terms = "" - if q.luqum_tree is not None: - # Successful parsing - logger.debug("parsed luqum tree: %s", repr(q.luqum_tree)) - word_children = [] - filter_children = [] - if isinstance(q.luqum_tree, (tree.UnknownOperation, tree.AndOperation)): - for child in q.luqum_tree.children: - if isinstance(child, tree.Word): - word_children.append(child) - else: - filter_children.append(child) - elif isinstance(q.luqum_tree, tree.Word): - # the query single term - word_children.append(q.luqum_tree) - else: - filter_children.append(q.luqum_tree) - # We join with space every non word not recognized by the parser - remaining_terms = " ".join(item.value for item in word_children) - filter_tree = None - if filter_children: - # Note: we always wrap in AndOperation, - # even if only one, to be consistent - filter_tree = tree.AndOperation(*filter_children) - - # remove harvested words - logger.debug("filter luqum tree: %s", repr(filter_tree)) - if filter_tree: - filter_query = filter_query_builder(filter_tree) - else: - filter_query = None - logger.debug("filter query from luqum: '%s'", filter_query) - else: - filter_query = None - remaining_terms = q.text_query - - return q.clone(fulltext=remaining_terms, filter_query=filter_query) - - def compute_facets_filters(q: QueryAnalysis) -> QueryAnalysis: """Extract facets filters from the query @@ -264,7 +152,7 @@ def parse_sort_by_script( operator, sort_by = str_utils.split_sort_by_sign(sort_by) script = (config.scripts or {}).get(sort_by) if script is None: - raise ValueError(f"Unknown script '{sort_by}'") + raise UnknownScriptError(f"Unknown script '{sort_by}'") script_id = get_script_id(index_id, sort_by) # join params and static params script_params = dict((params or {}), **(script.static_params or {})) @@ -297,45 +185,110 @@ def create_aggregation_clauses( return clauses +def add_languages_suffix( + analysis: QueryAnalysis, langs: list[str], config: IndexConfig +) -> QueryAnalysis: + """Add correct languages suffixes to fields of type text_lang or taxonomy + + This match in a langage OR another + """ + if analysis.luqum_tree is None: + return analysis + transformer = LanguageSuffixTransformer( + lang_fields=set(config.lang_fields), langs=langs + ) + analysis.luqum_tree = transformer.visit(analysis.luqum_tree) + return analysis + + +def resolve_unknown_operation(analysis: QueryAnalysis) -> QueryAnalysis: + """Resolve unknown operations in the query to a AND""" + if analysis.luqum_tree is None: + return analysis + transformer = UnknownOperationResolver(resolve_to=tree.AndOperation) + analysis.luqum_tree = transformer.visit(analysis.luqum_tree) + return analysis + + +def boost_phrases( + analysis: QueryAnalysis, boost: float, proximity: int | None +) -> QueryAnalysis: + """Boost all phrases in the query""" + if analysis.luqum_tree is None: + return analysis + transformer = PhraseBoostTransformer(boost=boost, proximity=proximity) + analysis.luqum_tree = transformer.visit(analysis.luqum_tree) + return analysis + + +def check_query(params: SearchParameters, analysis: QueryAnalysis) -> None: + """Run some sanity checks on the luqum query""" + if analysis.luqum_tree is None: + return + checker = QueryCheck(index_config=params.index_config, zeal=1) + errors = checker.errors(analysis.luqum_tree) + if errors: + raise QueryCheckError("Found errors while checking query", errors=errors) + + +def resolve_open_ranges(analysis: QueryAnalysis) -> QueryAnalysis: + """We need to resolve open ranges to closed ranges + before using elasticsearch query builder""" + if analysis.luqum_tree is None: + return analysis + transformer = OpenRangeTransformer() + analysis.luqum_tree = transformer.visit(analysis.luqum_tree) + return analysis + + def build_search_query( params: SearchParameters, - filter_query_builder: ElasticsearchQueryBuilder, + es_query_builder: ElasticsearchQueryBuilder, ) -> QueryAnalysis: """Build an elasticsearch_dsl Query. - :param q: the user raw query - :param langs: the set of languages we want to support, it is used to - select language subfields for some field types - :param size: number of results to return - :param page: requested page (starts at 1). - :param config: the index configuration to use - :param filter_query_builder: luqum elasticsearch query builder - :param sort_by: sorting key, defaults to None (=relevance-based sorting) + :param params: SearchParameters containing all search parameters + :param es_query_builder: the builder to transform + the luqum tree to an elasticsearch query :return: the built Search query """ analysis = parse_query(params.q) - analysis = decompose_query(analysis, filter_query_builder) analysis = compute_facets_filters(analysis) + analysis = resolve_unknown_operation(analysis) + analysis = resolve_open_ranges(analysis) + if params.boost_phrase and params.sort_by is None: + analysis = boost_phrases( + analysis, + params.index_config.match_phrase_boost, + params.index_config.match_phrase_boost_proximity, + ) + # add languages for localized fields + analysis = add_languages_suffix(analysis, params.langs, params.index_config) + # we are at a goop point to check the query + check_query(params, analysis) - logger.debug("filter query: %s", analysis.filter_query) - logger.debug("remaining terms: '%s'", analysis.fulltext) + logger.debug("luqum query: %s", analysis.luqum_tree) - return build_es_query(analysis, params) + return build_es_query(analysis, params, es_query_builder) def build_es_query( - q: QueryAnalysis, + analysis: QueryAnalysis, params: SearchParameters, + es_query_builder: ElasticsearchQueryBuilder, ) -> QueryAnalysis: config = params.index_config es_query = Search(index=config.index.name) - - if q.fulltext: - base_multi_match_q = build_query_clause(q.fulltext, params.langs, config) - es_query = es_query.query(base_multi_match_q) - - if q.filter_query: - es_query = es_query.query("bool", filter=q.filter_query) + # main query + if analysis.luqum_tree is not None: + try: + es_query = es_query.query( + es_query_builder(analysis.luqum_tree, params.index_config, params.langs) + ) + except luqum.exceptions.InconsistentQueryException as e: + raise InvalidLuceneQueryError( + "Request could not be transformed by luqum" + ) from e agg_fields = set(params.facets) if params.facets is not None else set() if params.charts is not None: @@ -343,7 +296,7 @@ def build_es_query( [ chart.field for chart in params.charts - if chart.chart_type == "DistributionChartType" + if chart.chart_type == "DistributionChart" ] ) for agg_name, agg in create_aggregation_clauses(config, agg_fields).items(): @@ -363,7 +316,7 @@ def build_es_query( size=params.page_size, from_=params.page_size * (params.page - 1), ) - return q.clone(es_query=es_query) + return analysis.clone(es_query=es_query) def build_completion_query( @@ -386,7 +339,7 @@ def build_completion_query( """ completion_clause = { - "field": f"names.{lang}", + "field": f"synonyms.{lang}", "size": size, "contexts": {"taxonomy_name": taxonomy_names}, } diff --git a/app/query_transformers.py b/app/query_transformers.py new file mode 100644 index 00000000..051eb26a --- /dev/null +++ b/app/query_transformers.py @@ -0,0 +1,218 @@ +import re + +import luqum.check +import luqum.visitor +from luqum import tree + +from .config import IndexConfig + + +class LanguageSuffixTransformer(luqum.visitor.TreeTransformer): + """This transformer adds a language suffix to lang_fields fields, + for any languages in langs (the languages we want to query on). + + That is `field1:something` will become + `field1:en:something OR field1:fr:something` + + Note: we do this only for the query parts that have a search field, + the text search without specifying a field + is handled by the ElasticSearch query builder + """ + + def __init__(self, lang_fields=set[str], langs=list[str], **kwargs): + # we need to track parents to get full field name + super().__init__(track_parents=True, track_new_parents=False, **kwargs) + self.langs = langs + self.lang_fields = lang_fields + + def visit_search_field(self, node, context): + """As we reach a search_field, + if it's one that have a lang, + we replace single expression with a OR on sub-language fields + """ + # FIXME: verify again the way luqum work on this side ! + field_name = node.name + # add eventual parents + prefix = ".".join( + node.name + for node in context.get("parents", ()) + if isinstance(node, tree.SearchField) + ) + if prefix: + field_name = f"{prefix}.{field_name}" + # is it a lang dependant field + if field_name in self.lang_fields: + # create a new expression for each languages + new_nodes = [] + for lang in self.langs: + # note: we don't have to care about having searchfield in children + # because only complete field_name would match a self.lang_fields + (new_node,) = self.generic_visit(node, context) + # add language prefix + new_node.name = f"{new_node.name}.{lang}" + new_nodes.append(new_node) + if len(new_nodes) > 1: + yield tree.OrOperation(*new_nodes) + else: + yield from new_nodes + else: + # default + yield from self.generic_visit(node, context) + + +def get_consecutive_words( + node: tree.BoolOperation, +) -> list[list[tuple[int, tree.Word]]]: + """Return a list of list of consecutive words, + with their index, in a bool operation + """ + consecutive: list[list[tuple[int, tree.Word]]] = [[]] + for i, child in enumerate(node.children): + if isinstance(child, tree.Word): + # append to last list + consecutive[-1].append((i, child)) + else: + # we have discontinuity + if len(consecutive[-1]) == 1: + # one term alone is not enough, clear the list + consecutive[-1] = [] + elif consecutive[-1]: + # create a new list + consecutive.append([]) + # remove last list if empty or only one element + if len(consecutive[-1]) <= 1: + consecutive.pop() + return consecutive + + +class PhraseBoostTransformer(luqum.visitor.TreeTransformer): + """This transformer boosts terms that are consecutive + and might be found in a query + + For example if we have `Whole AND Milk AND Cream` + we will boost items containing `"Whole Milk Cream"`, + the new expression will look like + (here with a boost of 2 and proxmity of 3): + `((Whole AND Milk AND Cream^2) OR "Whole Milk Cream"^2.0~3)` + + We also only apply it to terms that are not for a specified field. + + Note: It won't work on UnknownOperation, so you'd better resolve them before. + + :param boost: how much to boost consecutive terms + :param proximity: proxmity of the boosted phrase, enable to match with gaps + :param only_free_text: only apply to text without an explicit search field defined + """ + + def __init__( + self, boost: float, proximity: int | None = 1, only_free_text=True, **kwargs + ): + super().__init__(track_parents=True, track_new_parents=False, **kwargs) + # we transform float to str, + # because otherwise decimal.Decimal will make it look weird + self.boost = str(boost) + self.proximity = proximity + self.only_free_text = only_free_text + + def _get_consecutive_words(self, node): + return get_consecutive_words(node) + + def _phrase_boost_from_words(self, words): + """Given a group of words, give the new operation""" + expr = " ".join(word.value for word in words) + expr = f'"{expr}"' + phrase = tree.Phrase(expr) + if self.proximity: + phrase = tree.Proximity(phrase, degree=self.proximity) + phrase = tree.Boost(phrase, force=self.boost, head=" ") + new_expr = tree.Group( + tree.OrOperation(tree.Group(tree.AndOperation(*words), tail=" "), phrase) + ) + # tail and head transfer, to have good looking str + new_expr.head = words[0].head + words[0].head = "" + new_expr.tail = words[-1].tail + words[-1].tail = "" + return new_expr + + def visit_and_operation(self, node, context): + """As we find an OR operation try to boost consecutive word terms""" + # get the or operation with cloned children + (new_node,) = list(super().generic_visit(node, context)) + do_boost_phrases = True + if self.only_free_text: + # we don't do it if a parent is a SearchField + do_boost_phrases = not any( + isinstance(p, tree.SearchField) for p in context.get("parents", ()) + ) + if do_boost_phrases: + # group consecutive terms in AndOperations + consecutive = self._get_consecutive_words(new_node) + if consecutive: + # We have to modify children + # by replacing consecutive words with our new expressions. + # We use indexes for that. + new_children = [] + # change first word by the new operation + index_to_change = { + words[0][0]: self._phrase_boost_from_words( + [word[1] for word in words] + ) + for words in consecutive + } + # remove other words that are part of the expression + # (and we will keep the rest) + index_to_remove = set( + word[0] for words in consecutive for word in words[1:] + ) + for i, child in enumerate(new_node.children): + if i in index_to_change: + new_children.append(index_to_change[i]) + elif i not in index_to_remove: + new_children.append(child) + # substitute children of the new node + new_node.children = new_children + yield new_node + + +class QueryCheck(luqum.check.LuceneCheck): + """Sanity checks on luqum request""" + + # TODO: port to luqum + SIMPLE_EXPR_FIELDS = luqum.check.LuceneCheck.SIMPLE_EXPR_FIELDS + ( + tree.Range, + tree.OpenRange, + ) + FIELD_EXPR_FIELDS = SIMPLE_EXPR_FIELDS + (tree.FieldGroup,) + # TODO: shan't luqum should support "." in field names + field_name_re = re.compile(r"^[\w.]+$") + + def __init__(self, index_config: IndexConfig, **kwargs): + super().__init__(**kwargs) + self.index_config = index_config + + # TODO: this should be in LuceneCheck ! + def check_phrase(self, item, parents): + return iter([]) + + def check_open_range(self, item, parents): + return iter([]) + + def check_search_field(self, item, parents): + """Check if the search field is valid""" + yield from super().check_search_field(item, parents) + # might be an inner field get all parents fields + fields = [p.name for p in parents if isinstance(p, tree.SearchField)] + [ + item.name + ] + # join and split to normalize and only have one field + field_names = (".".join(fields)).split(".") + # remove eventual lang suffix + has_lang_suffix = field_names[-1] in self.index_config.supported_langs_set + if has_lang_suffix: + field_names.pop() + is_sub_field = len(field_names) > 1 + # check field exists in config, but only for non sub-field + # (TODO until we implement them in config) + if not is_sub_field and (field_names[0] not in self.index_config.fields): + yield f"Search field '{'.'.join(field_names)}' not found in index config" diff --git a/app/search.py b/app/search.py index ce27eaf1..6a607a55 100644 --- a/app/search.py +++ b/app/search.py @@ -2,7 +2,14 @@ from typing import cast from . import config -from ._types import SearchParameters, SearchResponse, SuccessSearchResponse +from ._types import ( + DebugInfo, + QueryAnalysis, + SearchParameters, + SearchResponse, + SearchResponseDebug, + SuccessSearchResponse, +) from .charts import build_charts from .facets import build_facets from .postprocessing import BaseResultProcessor, load_result_processor @@ -10,23 +17,47 @@ logger = logging.getLogger(__name__) -if config.CONFIG is None: - # We want to be able to import api.py (for tests for example) without - # failure, but we add a warning message as it's not expected in a - # production settings - logger.warning("Main configuration is not set, use CONFIG_PATH envvar") - FILTER_QUERY_BUILDERS = {} - RESULT_PROCESSORS = {} -else: - # we cache query builder and result processor here for faster processing - FILTER_QUERY_BUILDERS = { - index_id: build_elasticsearch_query_builder(index_config) - for index_id, index_config in config.CONFIG.indices.items() - } - RESULT_PROCESSORS = { - index_id: load_result_processor(index_config) - for index_id, index_config in config.CONFIG.indices.items() - } + +# we cache query builder and result processor here for faster processing +_ES_QUERY_BUILDERS = {} +_RESULT_PROCESSORS = {} + + +def get_es_query_builder(index_id): + if index_id not in _ES_QUERY_BUILDERS: + index_config = config.get_config().indices[index_id] + _ES_QUERY_BUILDERS[index_id] = build_elasticsearch_query_builder(index_config) + return _ES_QUERY_BUILDERS[index_id] + + +def get_result_processor(index_id): + if index_id not in _RESULT_PROCESSORS: + index_config = config.get_config().indices[index_id] + _RESULT_PROCESSORS[index_id] = load_result_processor(index_config) + return _RESULT_PROCESSORS[index_id] + + +def add_debug_info( + search_result: SuccessSearchResponse, + analysis: QueryAnalysis, + params: SearchParameters, +) -> SearchResponseDebug | None: + if not params.debug_info: + return None + data = {} + for debug_info in params.debug_info: + match debug_info: + case DebugInfo.es_query: + data[debug_info.value] = ( + analysis.es_query.to_dict() if analysis.es_query else None + ) + case DebugInfo.lucene_query: + data[debug_info.value] = ( + str(analysis.luqum_tree) if analysis.luqum_tree else None + ) + case DebugInfo.aggregations: + data[debug_info.value] = search_result.aggregations + return SearchResponseDebug(**data) def search( @@ -34,7 +65,7 @@ def search( ) -> SearchResponse: """Run a search""" result_processor = cast( - BaseResultProcessor, RESULT_PROCESSORS[params.valid_index_id] + BaseResultProcessor, get_result_processor(params.valid_index_id) ) logger.debug( "Received search query: q='%s', langs='%s', page=%d, " @@ -50,13 +81,18 @@ def search( index_config = params.index_config query = build_search_query( params, - # filter query builder is generated from elasticsearch mapping and + # ES query builder is generated from elasticsearch mapping and # takes ~40ms to generate, build-it before hand to avoid this delay - filter_query_builder=FILTER_QUERY_BUILDERS[params.valid_index_id], + es_query_builder=get_es_query_builder(params.valid_index_id), ) - logger.debug( - "Elasticsearch query: %s", - query.es_query.to_dict() if query.es_query else query.es_query, + ( + logger.debug( + "Luqum query: %s\nElasticsearch query: %s", + str(query.luqum_tree), + query.es_query.to_dict() if query.es_query else query.es_query, + ) + if logger.isEnabledFor(logging.DEBUG) # avoid processing if no debug + else None ) projection = set(params.fields) if params.fields else None @@ -72,6 +108,7 @@ def search( search_result, query, params.main_lang, index_config, params.facets ) search_result.charts = build_charts(search_result, index_config, params.charts) - # remove aggregations to avoid sending too much information + search_result.debug = add_debug_info(search_result, query, params) + # remove aggregations search_result.aggregations = None return search_result diff --git a/app/taxonomy.py b/app/taxonomy.py index 6b7b6d05..e1face38 100644 --- a/app/taxonomy.py +++ b/app/taxonomy.py @@ -3,6 +3,7 @@ See also :py:mod:`app.taxonomy_es` """ +from collections.abc import Iterator from pathlib import Path from typing import Any, Dict, Iterable, List, Optional, Set, Union @@ -10,7 +11,7 @@ import requests from app._types import JSONType -from app.config import settings +from app.config import TaxonomyConfig, settings from app.utils import get_logger from app.utils.download import download_file, http_session, should_download_file from app.utils.io import load_json @@ -339,6 +340,12 @@ def get_taxonomy( ~/.cache/openfoodfacts/taxonomy :return: a Taxonomy """ + if taxonomy_url.startswith("file://"): + # just use the file, it's already local + fpath = taxonomy_url[len("file://") :] + if not fpath.startswith("/"): + raise RuntimeError("Relative path (not yet) supported for taxonomy url") + return Taxonomy.from_path(fpath.rstrip("/")) filename = f"{taxonomy_name}.json" cache_dir = DEFAULT_CACHE_DIR if cache_dir is None else cache_dir @@ -353,3 +360,10 @@ def get_taxonomy( logger.info("Downloading taxonomy, saving it in %s", taxonomy_path) download_file(taxonomy_url, taxonomy_path) return Taxonomy.from_path(taxonomy_path) + + +def iter_taxonomies(taxonomy_config: TaxonomyConfig) -> Iterator[tuple[str, Taxonomy]]: + for taxonomy_source_config in taxonomy_config.sources: + yield taxonomy_source_config.name, get_taxonomy( + taxonomy_source_config.name, str(taxonomy_source_config.url) + ) diff --git a/app/taxonomy_es.py b/app/taxonomy_es.py index 24361c6e..a8f713e4 100644 --- a/app/taxonomy_es.py +++ b/app/taxonomy_es.py @@ -3,16 +3,24 @@ See also :py:mod:`app.taxonomy` """ +import os +import re +import shutil +from pathlib import Path + from elasticsearch_dsl import Search from elasticsearch_dsl.query import Q from app.config import IndexConfig +from app.taxonomy import Taxonomy, iter_taxonomies +from app.utils import connection +from app.utils.io import safe_replace_dir def get_taxonomy_names( items: list[tuple[str, str]], config: IndexConfig, -) -> dict[tuple[str, str], dict[str, list[str]]]: +) -> dict[tuple[str, str], dict[str, str]]: """Given a set of terms in different taxonomies, return their names""" filters = [] for id, taxonomy_name in items: @@ -24,6 +32,88 @@ def get_taxonomy_names( .params(size=len(filters)) ) return { - (result.id, result.taxonomy_name): result.names.to_dict() + (result.id, result.taxonomy_name): result.name.to_dict() for result in query.execute().hits } + + +def _normalize_synonym(token: str) -> str: + """Normalize a synonym, + + It applies the same filter as ES will apply before the synonym filter + to ensure matching tokens + """ + # make lower case + token = token.lower() + # changes anything that is neither a word char nor a space for space + token = re.sub(r"[^\w\s]+", " ", token) + # normalize spaces + token = re.sub(r"\s+", " ", token) + # TODO: should we also run asciifolding or so ? Or depends on language ? + return token + + +def create_synonyms_files(taxonomy: Taxonomy, langs: list[str], target_dir: Path): + """Create a set of files that can be used to define a Synonym Graph Token Filter + + We will match every known synonym in a language + to the identifier of the entry. + We do this because we are not sure which is the main language for an entry. + + Also the special xx language is added to every languages if it exists. + + see: + https://www.elastic.co/guide/en/elasticsearch/reference/current/search-with-synonyms.html#synonyms-store-synonyms-file + """ + + # auto-generate synonyms files for each language, ready to write to + synonyms_paths = {lang: (target_dir / f"{lang}.txt") for lang in langs} + synonyms_files = {lang: fpath.open("w") for lang, fpath in synonyms_paths.items()} + + for node in taxonomy.iter_nodes(): + # we add multi lang synonyms to every language + multi_lang_synonyms = node.synonyms.get("xx", []) + multi_lang_synonyms = [_normalize_synonym(s) for s in multi_lang_synonyms] + # also node id without prefix + multi_lang_synonyms.append(_normalize_synonym(node.id.split(":", 1)[-1])) + multi_lang_synonyms = [s for s in multi_lang_synonyms if s.strip()] + for lang, synonyms in node.synonyms.items(): + if (not synonyms and not multi_lang_synonyms) or lang not in langs: + continue + # avoid commas in synonyms… add multilang syns and identifier without prefix + synonyms_ = (_normalize_synonym(s) for s in synonyms) + synonyms = [s for s in synonyms_ if s.strip()] + synonyms = sorted(set(synonyms + multi_lang_synonyms)) + synonyms = [s for s in synonyms if s.strip()] + if synonyms: + synonyms_files[lang].write(f"{','.join(synonyms)} => {node.id}\n") + + # close files + for f in synonyms_files.values(): + f.close() + + +def create_synonyms(index_config: IndexConfig, target_dir: Path): + for name, taxonomy in iter_taxonomies(index_config.taxonomy): + target = target_dir / name + # a temporary directory, we move at the end + target_tmp = target_dir / f"{name}.tmp" + shutil.rmtree(target_tmp, ignore_errors=True) + # ensure directory + os.makedirs(target_tmp, mode=0o775, exist_ok=True) + # generate synonyms files + create_synonyms_files(taxonomy, index_config.supported_langs, target_tmp) + # move to final location, overriding previous files + safe_replace_dir(target, target_tmp) + # Note: in current deployment, file are shared between ES instance, + # so we don't need to replicate the files + + +def refresh_synonyms(index_name: str, index_config: IndexConfig, target_dir: Path): + create_synonyms(index_config, target_dir) + es = connection.current_es_client() + if es.indices.exists(index=index_name): + # trigger update of synonyms in token filters by reloading search analyzers + # and clearing relevant cache + es.indices.reload_search_analyzers(index=index_name) + es.indices.clear_cache(index=index_name, request=True) diff --git a/app/utils/analyzers.py b/app/utils/analyzers.py index f091daba..f89f356d 100644 --- a/app/utils/analyzers.py +++ b/app/utils/analyzers.py @@ -1,15 +1,183 @@ """Defines some analyzers for the elesaticsearch fields.""" -from elasticsearch_dsl import analyzer - -#: An analyzer for the autocomplete field -AUTOCOMPLETE_ANALYZERS = { - "fr": analyzer( - "autocomplete_fr", tokenizer="standard", filter=["lowercase", "asciifolding"] - ), - "de": analyzer( - "autocomplete_de", - tokenizer="standard", - filter=["lowercase", "german_normalization"], - ), +from typing import Optional + +from elasticsearch_dsl import Mapping +from elasticsearch_dsl import analysis as dsl_analysis +from elasticsearch_dsl import analyzer, char_filter, token_filter + +from app._types import JSONType + +# some normalizers existing in ES that are specific to some languages +SPECIAL_NORMALIZERS = { + "ar": "arabic_normalization", + "bn": "bengali_normalization", + "de": "german_normalization", + "hi": "hindi_normalization", + "inc": "indic_normalization", + "fa": "persian_normalization", + "sv": "scandinavian_folding", + "da": "scandinavian_folding", + "no": "scandinavian_folding", + "fi": "scandinavian_folding", + "sr": "serbian_normalization", + "ckb": "sorani_normalization", +} + + +# TODO: this could be provided by the taxonomy / per language +STOP_WORDS = { + "ar": "_arabic_", + "hy": "_armenian_", + "eu": "_basque_", + "bn": "_bengali_", + # "pt_BR": _brazilian_ + "bg": "_bulgarian_", + "ca": "_catalan_", + "ja": "_cjk_", + "zh": "_cjk_", + "ko": "_cjk_", + "cs": "_czech_", + "da": "_danish_", + "nl": "_dutch_", + "en": "_english_", + "et": "_estonian_", + "fi": "_finnish_", + "fr": "_french_", + "gl": "_galician_", + "de": "_german_", + "el": "_greek_", + "hi": "_hindi_", + "hu": "_hungarian_", + "id": "_indonesian_", + "ga": "_irish_", + "it": "_italian_", + "lv": "_latvian_", + "lt": "_lithuanian_", + "no": "_norwegian_", + "fa": "_persian_", + "pt": "_portuguese_", + "ro": "_romanian_", + "ru": "_russian_", + "sr": "_serbian_", + # "": "_sorani_", + "es": "_spanish_", + "sv": "_swedish_", + "th": "_thai_", + "tr": "_turkish_ ", } + + +def get_taxonomy_synonym_filter(taxonomy: str, lang: str) -> dsl_analysis.TokenFilter: + """Return the synonym filter to use for the taxonomized field analyzer""" + return token_filter( + f"synonym_graph_{taxonomy}_{lang}", + type="synonym_graph", + synonyms_path=f"synonyms/{taxonomy}/{lang}.txt", + updateable=True, + ) + + +def get_taxonomy_stop_words_filter( + taxonomy: str, lang: str +) -> Optional[dsl_analysis.TokenFilter]: + """Return the stop words filter to use for the taxonomized field analyzer + + IMPORTANT: de-activated for now ! + If we want to handle them, we have to remove them in synonyms, so we need the list. + """ + stop_words = STOP_WORDS.get(lang) + # deactivate for now + if False and stop_words: + return token_filter( + f"taxonomy_stop_words_{lang}", + type="stop", + stopwords=stop_words, + remove_trailing=True, + ) + return None + + +TAXONOMIES_CHAR_FILTER = char_filter( + "taxonomies_char_filter", + type="mapping", + mappings=[ + # hyphen to underscore + "- => _", + # and escape quotes, so that ES cut words on them + r"' => \\'", + r"’ => \\'", + ], +) + + +def get_taxonomy_indexing_analyzer( + taxonomy: str, lang: str +) -> dsl_analysis.CustomAnalysis: + """We want to index taxonomies terms as keywords (as we only store the id), + but with a specific tweak: transform hyphens into underscores, + """ + # does not really depends on taxonomy and lang + return analyzer( + "taxonomy_indexing", + tokenizer="keyword", + char_filter=[TAXONOMIES_CHAR_FILTER], + ) + + +def get_taxonomy_search_analyzer( + taxonomy: str, lang: str, with_synonyms: bool +) -> dsl_analysis.CustomAnalysis: + """Return the search analyzer to use for the taxonomized field + + :param taxonomy: the taxonomy name + :param lang: the language code + :param with_synonyms: whether to add the synonym filter + """ + # we replace hyphen with underscore + filters: list[str | token_filter] = [ + "lowercase", + ] + stop_words = get_taxonomy_stop_words_filter(taxonomy, lang) + if stop_words: + filters.append(stop_words) + filters.append(SPECIAL_NORMALIZERS.get(lang, "asciifolding")) + if with_synonyms: + filters.append( + get_taxonomy_synonym_filter(taxonomy, lang), + ) + return analyzer( + f"search_{taxonomy}_{lang}", + char_filter=[TAXONOMIES_CHAR_FILTER], + tokenizer="standard", + filter=filters, + ) + + +def get_autocomplete_analyzer(lang: str) -> dsl_analysis.CustomAnalysis: + """Return the search analyzer to use for the autocomplete field""" + return analyzer( + f"autocomplete_{lang}", + tokenizer="standard", + filter=["lowercase", SPECIAL_NORMALIZERS.get(lang, "asciifolding")], + ) + + +def number_of_fields(mapping: Mapping | dict[str, JSONType]) -> int: + """Return the number of fields in the mapping""" + count = 0 + properties: dict[str, JSONType] = ( + mapping.to_dict().get("properties", {}) + if isinstance(mapping, Mapping) + else mapping + ) + for field, value in properties.items(): + if isinstance(value, dict): + if props := value.get("properties"): + # object field with properties + count += number_of_fields(props) + if fields := value.get("fields"): + # subfields + count += number_of_fields(fields) + count += 1 + return count diff --git a/app/utils/io.py b/app/utils/io.py index 34a93d2b..bf7859a4 100644 --- a/app/utils/io.py +++ b/app/utils/io.py @@ -1,4 +1,5 @@ import gzip +import shutil from pathlib import Path from typing import Callable, Iterable @@ -54,3 +55,27 @@ def dump_json(path: str | Path, item: JSONType, **kwargs): open_fn = get_open_fn(path) with open_fn(str(path), "wb") as f: f.write(orjson.dumps(item, **kwargs)) + + +def safe_replace_dir(target: Path, new_target: Path): + """Replace a directory atomically""" + # a temporary place for the target dir + old_target = target.with_suffix(target.suffix + ".old") + # move target to old_target + if old_target.exists(): + shutil.rmtree(old_target) + if target.exists(): + shutil.move(target, old_target) + # move our file + try: + shutil.move(new_target, target) + except Exception: + # if something went wrong, we restore the old target + if old_target.exists(): + shutil.move(old_target, target) + # reraise + raise + else: + # cleanup + if old_target.exists(): + shutil.rmtree(old_target) diff --git a/app/validations.py b/app/validations.py index b3b764d7..3164168a 100644 --- a/app/validations.py +++ b/app/validations.py @@ -1,6 +1,4 @@ -from typing import cast - -from .config import CONFIG, Config +from .config import Config, get_config def check_index_id_is_defined(index_id: str | None, config: Config) -> None: @@ -31,7 +29,7 @@ def check_all_values_are_fields_agg( errors: list[str] = [] if values is None: return errors - global_config = cast(Config, CONFIG) + global_config = get_config() index_id, index_config = global_config.get_index_config(index_id) if index_config is None: raise ValueError(f"Cannot get index config for index_id {index_id}") @@ -55,7 +53,7 @@ def check_fields_are_numeric( if values is None: return errors - global_config = cast(Config, CONFIG) + global_config = get_config() index_id, index_config = global_config.get_index_config(index_id) if index_config is None: raise ValueError(f"Cannot get index config for index_id {index_id}") diff --git a/data/config/openfoodfacts.yml b/data/config/openfoodfacts.yml index 256e366b..a12d3b73 100644 --- a/data/config/openfoodfacts.yml +++ b/data/config/openfoodfacts.yml @@ -44,19 +44,20 @@ indices: categories: full_text_search: true input_field: categories_tags - taxonomy_name: category + taxonomy_name: categories type: taxonomy + bucket_agg: true labels: full_text_search: true input_field: labels_tags - taxonomy_name: label + taxonomy_name: labels type: taxonomy + bucket_agg: true brands: full_text_search: true - split: true - type: text - brands_tags: - type: keyword + input_field: brands_tags + type: taxonomy + taxonomy_name: brands bucket_agg: true stores: split: true @@ -74,27 +75,22 @@ indices: bucket_agg: true quantity: type: text - categories_tags: - type: keyword - taxonomy_name: category - bucket_agg: true - labels_tags: - type: keyword - taxonomy_name: label - bucket_agg: true - countries_tags: - type: keyword + countries: + type: taxonomy + input_field: conutries_tags bucket_agg: true - taxonomy_name: country - states_tags: - type: keyword + taxonomy_name: countries + states: + type: taxonomy + input_field: states_tags bucket_agg: true - taxonomy_name: state + taxonomy_name: states origins_tags: type: keyword - ingredients_tags: - type: keyword - taxonomy_name: ingredient + ingredients: + type: taxonomy + input_field: ingredients_tags + taxonomy_name: ingredients unique_scans_n: type: integer scans_n: @@ -116,9 +112,11 @@ indices: type: disabled additives_n: type: integer - allergens_tags: - type: keyword - taxonomy_name: allergen + allergens: + type: taxonomy + input_field: allergens_tags + taxonomy_name: allergens + bucket_agg: true ecoscore_data: type: disabled ecoscore_score: @@ -163,74 +161,69 @@ indices: accent_color: "#ff8714" taxonomy: sources: - - name: category + - name: categories url: https://static.openfoodfacts.org/data/taxonomies/categories.full.json - - name: label + - name: labels url: https://static.openfoodfacts.org/data/taxonomies/labels.full.json - - name: additive + - name: additives url: https://static.openfoodfacts.org/data/taxonomies/additives.full.json - - name: allergen + - name: allergens url: https://static.openfoodfacts.org/data/taxonomies/allergens.full.json - - name: amino_acid + - name: amino_acids url: https://static.openfoodfacts.org/data/taxonomies/amino_acids.full.json - - name: country + - name: countries url: https://static.openfoodfacts.org/data/taxonomies/countries.full.json - name: data_quality url: https://static.openfoodfacts.org/data/taxonomies/data_quality.full.json - - name: food_group + - name: food_groups url: https://static.openfoodfacts.org/data/taxonomies/food_groups.full.json - - name: improvement + - name: improvements url: https://static.openfoodfacts.org/data/taxonomies/improvements.full.json - - name: ingredient + - name: ingredients url: https://static.openfoodfacts.org/data/taxonomies/ingredients.full.json - name: ingredients_analysis url: https://static.openfoodfacts.org/data/taxonomies/ingredients_analysis.full.json - name: ingredients_processing url: https://static.openfoodfacts.org/data/taxonomies/ingredients_processing.full.json - - name: language + - name: languages url: https://static.openfoodfacts.org/data/taxonomies/languages.full.json - - name: mineral + - name: minerals url: https://static.openfoodfacts.org/data/taxonomies/minerals.full.json - name: misc url: https://static.openfoodfacts.org/data/taxonomies/misc.full.json - - name: nova_group + - name: nova_groups url: https://static.openfoodfacts.org/data/taxonomies/nova_groups.full.json - - name: nucleotide + - name: nucleotides url: https://static.openfoodfacts.org/data/taxonomies/nucleotides.full.json - - name: nutrient + - name: nutrients url: https://static.openfoodfacts.org/data/taxonomies/nutrients.full.json - - name: origin + - name: origins url: https://static.openfoodfacts.org/data/taxonomies/origins.full.json - - name: other_nutritional_substance + - name: other_nutritional_substances url: https://static.openfoodfacts.org/data/taxonomies/other_nutritional_substances.full.json - - name: packaging_material + - name: packaging_materials url: https://static.openfoodfacts.org/data/taxonomies/packaging_materials.full.json - name: packaging_recycling url: https://static.openfoodfacts.org/data/taxonomies/packaging_recycling.full.json - - name: packaging_shape + - name: packaging_shapes url: https://static.openfoodfacts.org/data/taxonomies/packaging_shapes.full.json - name: periods_after_opening url: https://static.openfoodfacts.org/data/taxonomies/periods_after_opening.full.json - name: preservation url: https://static.openfoodfacts.org/data/taxonomies/preservation.full.json - - name: state + - name: states url: https://static.openfoodfacts.org/data/taxonomies/states.full.json - - name: vitamin + - name: vitamins url: https://static.openfoodfacts.org/data/taxonomies/vitamins.full.json - - name: brand + - name: brands url: https://static.openfoodfacts.org/data/taxonomies/brands.full.json - exported_langs: - - en - - fr - - es - - de - - it - - nl index: number_of_replicas: 1 number_of_shards: 4 name: off_taxonomy supported_langs: + # a specific language to put the main language entry + - main - aa - ab - ae @@ -386,7 +379,6 @@ indices: - wa - wo - xh - - xx - yi - yo - zh diff --git a/docker-compose.yml b/docker-compose.yml index 4467b325..52f562e0 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -30,7 +30,6 @@ x-base-es-envs: &base-es-envs http.cors.allow-headers: X-Requested-With,X-Auth-Token,Content-Type,Content-Length,Authorization http.cors.allow-credentials: "true" - x-api-common: &api-common image: ghcr.io/openfoodfacts/search-a-licious/search_service_image:${TAG:-dev} restart: ${RESTART_POLICY:-always} @@ -52,6 +51,7 @@ x-api-common: &api-common - common_net volumes: - ./data:/opt/search/data + - es_synonyms:/opt/search/synonyms services: @@ -63,6 +63,7 @@ services: discovery.seed_hosts: es02 volumes: - esdata01:/usr/share/elasticsearch/data + - es_synonyms:/usr/share/elasticsearch/config/synonyms es02: <<: *base-es-service @@ -74,6 +75,7 @@ services: - es01 volumes: - esdata02:/usr/share/elasticsearch/data + - es_synonyms:/usr/share/elasticsearch/config/synonyms # elasticsearch browser elasticvue: @@ -125,6 +127,7 @@ services: volumes: esdata01: esdata02: + es_synonyms: networks: # this is the network shared with product opener diff --git a/docker/prod.yml b/docker/prod.yml index 1684011f..7406c5d7 100644 --- a/docker/prod.yml +++ b/docker/prod.yml @@ -9,6 +9,9 @@ volumes: esdata02: external: true name: ${COMPOSE_PROJECT_NAME}_esdata02 + es_synonyms: + external: true + name: ${COMPOSE_PROJECT_NAME}_es_synonyms networks: common_net: diff --git a/docs/users/explain-configuration.md b/docs/users/explain-configuration.md index 23bb158e..87af06c7 100644 --- a/docs/users/explain-configuration.md +++ b/docs/users/explain-configuration.md @@ -50,13 +50,64 @@ You have to plan in advance how you configure this. Think well about: * fields you want to search and how you want to search them -* which informations you need to display in search results +* which information you need to display in search results * what you need to sort on * which facets you want to display * which charts you need to build Changing this section will probably involve a full re-indexing of all your items. +Some typical configurations for fields: + +A tags field that as values that are searched as an exact value (aka keyword), eg. a tag: +```yaml +tags: + type: keyword +``` + +An ingredients field that is used for full text search when no field is specified: +```yaml +ingredients: + type: text + full_text_search: true +``` + +A field `product_name` that is used for full text search, but with multilingual support: +```yaml +product_name: + full_text_search: true + type: text_lang +``` + +A scans_n field is an integer field: +```yaml +scans_n: + type: integer +``` + +A `specific_warnings` field that is used for full text search, +but only when you specify the field: +```yaml +specific_warnings: + type: text +``` + +A field brands_tags that needs to be split in multiple values (according to `split_separator` option): +```yaml +brands_tags: + type: keyword + split: true +``` + +A field labels_tags, that is used for exact match but with support of a taxonomy, +and that can be used for faceting, and bar graph generation: +```yaml +labels_tags: + type: keyword + taxonomy_name: label + bucket_agg: true +``` + Read more in the [reference documentation](./ref-config/searchalicious-config-schema.html#fields). ## Document fetcher, pre-processors and post-processors diff --git a/docs/users/explain-taxonomies.md b/docs/users/explain-taxonomies.md new file mode 100644 index 00000000..9c90b80f --- /dev/null +++ b/docs/users/explain-taxonomies.md @@ -0,0 +1,65 @@ +# Explain taxonomies + +Taxonomies are a way to organize categorization of items. + +Normally, a taxonomy is about a specific field. +For each possible values, it defines translations in different languages, and also possible synonyms (in each language). +For each entry we have a canonical identifier. + +A taxonomy also organizes the entries within a direct acyclic graph (a hierarchy but with possibility of multiple parents, though always avoiding cycles). +For example it may help describe that a salmon is a marine fish as well as a freshwater fish, and a oily fish. + +It can be used to help users find items using a specific field, in their language, even if they use a synonym for it. + +## Listing taxonomies + +If you plan to use taxonomies, you should first list them, in the [taxonomy section of the configuration](./ref-config/searchalicious-config-schema.html#indices_additionalProperties_taxonomy). + +Taxonomies must come in a JSON format, that can be downloaded at a particular URL. + +The data in the JSON must contain an object, where: +* each key correspond to the id of the taxonomy entries +* the value is an Object, with the following fields (none are mandatory): + * `name`: an Object associating language code, + with the entry name in the language (useful for translating the entry) + * `synonyms`: an Object associating language code, + with an array of synonyms for this entry in this language + +## Taxonomy fields + +As you define your [fields in the configuration](./explain-configuration.md#fields), +you can specify that a field is a taxonomy field (`type: taxonomy`). + +In this case, you also have to provide the following fields: +* taxonomy_name: the name of the taxonomy (as defined in the configuration) + +* synonyms_search: if true, + this will add a full text subfield that will enable using synonyms and translations to match this term. + + +## Autocompletion with taxonomies + +When you import taxonomies, they can be used to provide autocompletion in multiple ways. + +The webcomponents can use them to add values to facets, +or to provide suggestions in the search bar. + +You can also use the [autocompletion API](../ref-openapi/#operation/taxonomy_autocomplete_autocomplete_get) + +## Importing taxonomies + +If you defined taxonomies, +you must import them using the [import-taxonomies command](../devs/ref-python/cli.html#python3-m-app-import-taxonomies). + + +## Technical details on taxonomy fields + +A taxonomy field is stored in Elasticsearch as an object. +For each language it has a specific field, but in this field we just store the taxonomy entry id (eg. for organic, we always store `en:organic`). The analyzer is almost set to `keyword` which means it won't be tokenized (but it is not completely true, as we also transform hyphen to underscore). + +Note that the value of this field must be considered a unique token by elasticsearch standard tokenizer. +So you should only use letters, numbers, columns and the underscore. +As an exception, we allow the hyphen character, transforming it to "_" before tokenization. + +But those field have a specific *search analyzer*, so that when you enter a search query, +The query text is tokenized using standard analyzer, then lower cased, and we then look for synonyms in the taxonomy. \ No newline at end of file diff --git a/docs/users/ref-web-components.md b/docs/users/ref-web-components.md index 00669f8a..4afe261e 100644 --- a/docs/users/ref-web-components.md +++ b/docs/users/ref-web-components.md @@ -4,6 +4,8 @@ This page documents [web Components](https://developer.mozilla.org/en-US/docs/We provided by Search-a-licious to quickly build your interfaces. +See the [tutorial for an introduction](./tutorial.md#building-a-search-interface) + ## Customization ### Styling @@ -16,7 +18,6 @@ We only translated basic messages and most labels can generally be overridden us If you however needs to override current translations, you might clone this project, change translations in xliff files and regenerate the bundle. - ## Main components Those are the components you will certainly use to build your interface. diff --git a/frontend/public/off.html b/frontend/public/off.html index d7f24766..cda33377 100644 --- a/frontend/public/off.html +++ b/frontend/public/off.html @@ -286,7 +286,7 @@