Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added max fields and doc size limits #135

Merged
merged 8 commits into from
Oct 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 43 additions & 33 deletions src/marqo/_httprequests.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@
DocumentNotFoundError,
IndexAlreadyExistsError,
InvalidIndexNameError,
HardwareCompatabilityError
HardwareCompatabilityError,
IndexMaxFieldsError
)
from urllib3.exceptions import InsecureRequestWarning
import warnings

ALLOWED_OPERATIONS = {requests.delete, requests.get, requests.post, requests.put}

Expand Down Expand Up @@ -45,38 +48,40 @@ def send_request(
if content_type is not None and content_type:
req_headers['Content-Type'] = content_type

try:
request_path = self.config.url + '/' + path
if isinstance(body, bytes):
response = http_method(
request_path,
timeout=self.config.timeout,
headers=req_headers,
data=body,
verify=to_verify
)
elif isinstance(body, str):
response = http_method(
request_path,
timeout=self.config.timeout,
headers=req_headers,
data=body,
verify=to_verify
)
else:
response = http_method(
request_path,
timeout=self.config.timeout,
headers=req_headers,
data=json.dumps(body) if body else None,
verify=to_verify
)
return self.__validate(response)

except requests.exceptions.Timeout as err:
raise BackendTimeoutError(str(err)) from err
except requests.exceptions.ConnectionError as err:
raise BackendCommunicationError(str(err)) from err
with warnings.catch_warnings():
if not self.config.cluster_is_remote:
warnings.simplefilter('ignore', InsecureRequestWarning)
try:
request_path = self.config.url + '/' + path
if isinstance(body, bytes):
response = http_method(
request_path,
timeout=self.config.timeout,
headers=req_headers,
data=body,
verify=to_verify
)
elif isinstance(body, str):
response = http_method(
request_path,
timeout=self.config.timeout,
headers=req_headers,
data=body,
verify=to_verify
)
else:
response = http_method(
request_path,
timeout=self.config.timeout,
headers=req_headers,
data=json.dumps(body) if body else None,
verify=to_verify
)
return self.__validate(response)
except requests.exceptions.Timeout as err:
raise BackendTimeoutError(str(err)) from err
except requests.exceptions.ConnectionError as err:
raise BackendCommunicationError(str(err)) from err

def get(
self, path: str,
Expand Down Expand Up @@ -163,6 +168,11 @@ def convert_to_marqo_web_error_and_raise(response: requests.Response, err: reque
raise HardwareCompatabilityError(
message=f"Filtering is not yet supported for arm-based architectures"
) from err
elif open_search_error_type == "illegal_argument_exception":
reason = response_dict["error"]["reason"].lower()
if "limit of total fields" in reason and "exceeded" in reason:
raise IndexMaxFieldsError(message="Exceeded maximum number of "
"allowed fields for this index.")
except KeyError:
pass

Expand Down
7 changes: 0 additions & 7 deletions src/marqo/config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
from typing import Optional, Union
import urllib3
import warnings
from marqo.tensor_search import enums


Expand All @@ -22,23 +20,18 @@ def __init__(
self.cluster_is_s2search = False
self.url = self.set_url(url)
self.timeout = timeout

default_device = enums.Device.cpu

self.indexing_device = indexing_device if indexing_device is not None else default_device
self.search_device = search_device if search_device is not None else default_device
# suppress warnings until we figure out the dependency issues:
warnings.filterwarnings("ignore")

def set_url(self, url):
"""Set the URL, and infers whether that url is remote"""
lowered_url = url.lower()
local_host_markers = ["localhost", "0.0.0.0", "127.0.0.1"]
if any([marker in lowered_url for marker in local_host_markers]):
urllib3.disable_warnings()
self.cluster_is_remote = False
else:
warnings.resetwarnings()
self.cluster_is_remote = True
if "s2search.io" in lowered_url:
self.cluster_is_s2search = True
Expand Down
12 changes: 12 additions & 0 deletions src/marqo/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ class InvalidArgError(__InvalidRequestError):
status_code = HTTPStatus.BAD_REQUEST


class DocTooLargeError(__InvalidRequestError):
code = "doc_too_large"
status_code = HTTPStatus.BAD_REQUEST


class BadRequestError(__InvalidRequestError):
code = "bad_request"
status_code = HTTPStatus.BAD_REQUEST
Expand All @@ -142,6 +147,13 @@ class HardwareCompatabilityError(__InvalidRequestError):
code = "hardware_compatability_error"
status_code = HTTPStatus.BAD_REQUEST


class IndexMaxFieldsError(__InvalidRequestError):
"""Error when attempting to index a document that increases the indexes' number of
fields above the index limit"""
code = "index_max_fields_error"
status_code = HTTPStatus.BAD_REQUEST

# ---MARQO INTERNAL ERROR---


Expand Down
11 changes: 5 additions & 6 deletions src/marqo/tensor_search/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,12 @@ def add_customer_field_properties(config: Config, index_name: str,
# copy fields to the chunk for prefiltering. If it is text, convert it to a keyword type to save space
# if it's not text, ignore it, and leave it up to OpenSearch (e.g: if it's a number)
for field_name in customer_field_names:
nested_non_vector_field_type = enums.OpenSearchDataType.keyword if field_name[1] == enums.OpenSearchDataType.text else field_name[1]
if nested_non_vector_field_type == enums.OpenSearchDataType.text \
or nested_non_vector_field_type == enums.OpenSearchDataType.keyword:
if field_name[1] == enums.OpenSearchDataType.text \
or field_name[1] == enums.OpenSearchDataType.keyword:
body["properties"][enums.TensorField.chunks]["properties"][validation.validate_field_name(field_name[0])] = {
"type": enums.OpenSearchDataType.keyword
}
"type": enums.OpenSearchDataType.keyword,
"ignore_above": 32766 # this is the Marqo-OS bytes limit
}

mapping_res = HttpRequests(config).put(path=F"{index_name}/_mapping", body=json.dumps(body))

Expand All @@ -137,7 +137,6 @@ def add_customer_field_properties(config: Config, index_name: str,
new_index_properties[validation.validate_field_name(new_prop)] = {
"type": type_to_set
}

get_cache()[index_name] = IndexInfo(
model_name=existing_info.model_name,
properties=new_index_properties,
Expand Down
10 changes: 9 additions & 1 deletion src/marqo/tensor_search/configs.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from marqo.tensor_search import enums as ns_enums
from marqo.tensor_search.enums import IndexSettingsField as NsFields
from marqo.tensor_search.enums import IndexSettingsField as NsFields, EnvVars
from torch import multiprocessing as mp


Expand All @@ -23,3 +23,11 @@ def get_default_index_settings():
NsFields.number_of_shards: 5
}


def default_env_vars() -> dict:

return {
EnvVars.MARQO_MAX_INDEX_FIELDS: None,
EnvVars.MARQO_MAX_DOC_BYTES: 100000,
EnvVars.MARQO_MAX_RETRIEVABLE_DOCS: None
}
8 changes: 8 additions & 0 deletions src/marqo/tensor_search/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,11 @@ class OpenSearchDataType:
float = "float"
integer = "integer"
to_be_defined = "to_be_defined" # to be defined by OpenSearch


class EnvVars:
MARQO_MAX_INDEX_FIELDS = "MARQO_MAX_INDEX_FIELDS"
MARQO_MAX_DOC_BYTES = "MARQO_MAX_DOC_BYTES"
MARQO_MAX_RETRIEVABLE_DOCS = "MARQO_MAX_RETRIEVABLE_DOCS"


43 changes: 41 additions & 2 deletions src/marqo/tensor_search/tensor_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@
import uuid
from typing import List, Optional, Union, Iterable, Sequence, Dict, Any
from PIL import Image
from marqo.tensor_search.enums import MediaType, MlModel, TensorField, SearchMethod, OpenSearchDataType
from marqo.tensor_search.enums import (
MediaType, MlModel, TensorField, SearchMethod, OpenSearchDataType,
EnvVars
)
from marqo.tensor_search.enums import IndexSettingsField as NsField
from marqo.tensor_search import utils, backend, validation, configs, parallel
from marqo.tensor_search.formatting import _clean_doc
Expand Down Expand Up @@ -82,12 +85,23 @@ def create_vector_index(
"knn.algo_param.ef_search": 100,
"refresh_interval": refresh_interval
},
"number_of_shards": the_index_settings[NsField.number_of_shards]
"number_of_shards": the_index_settings[NsField.number_of_shards],

},
"mappings": {
"_meta": {
"media_type": media_type,
},
"dynamic_templates": [
{
"strings": {
"match_mapping_type": "string",
"mapping": {
"type": "text"
}
}
}
],
"properties": {
TensorField.chunks: {
"type": "nested",
Expand All @@ -103,7 +117,11 @@ def create_vector_index(
}
}
}
max_marqo_fields = utils.read_env_vars_and_defaults(EnvVars.MARQO_MAX_INDEX_FIELDS)

if max_marqo_fields is not None:
max_os_fields = _marqo_field_limit_to_os_limit(int(max_marqo_fields))
vector_index_settings["settings"]["mapping"] = {"total_fields": {"limit": int(max_os_fields)}}
model_name = the_index_settings[NsField.index_defaults][NsField.model]
vector_index_settings["mappings"]["_meta"][NsField.index_settings] = the_index_settings
vector_index_settings["mappings"]["_meta"]["model"] = model_name
Expand All @@ -117,6 +135,27 @@ def create_vector_index(
return response


def _marqo_field_limit_to_os_limit(marqo_index_field_limit: int) -> int:
"""Translates a Marqo Index Field limit (that a Marqo user will set)
into the equivalent limit for Marqo-OS

Each Marqo field generates 3 Marqo-OS fields:
- One for its content
- One for its vector
- One for filtering

There are also 3 fields that will be generated on a Marqo index, in most
cases:
- one for the chunks field
- one for chunk's __field_content
- one for chunk's __field_name

Returns:
The corresponding Marqo-OS limit
"""
return (marqo_index_field_limit * 3) + 3


def _autofill_index_settings(index_settings: dict):
"""A half-complete index settings will be auto filled"""

Expand Down
21 changes: 19 additions & 2 deletions src/marqo/tensor_search/utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import os
import typing
from typing import List
import functools
import json
import torch
from marqo import errors
from marqo.tensor_search import enums
from marqo.tensor_search import enums, configs
from typing import List, Optional, Union, Callable, Iterable, Sequence, Dict
import copy
import datetime
Expand Down Expand Up @@ -159,3 +159,20 @@ def merge(merged: dict, prefs: dict) -> dict:
return merged

return merge(merged=merged_dicts, prefs=preferences)


def read_env_vars_and_defaults(var: str) -> Optional[str]:
"""Attempts to read an environment variable.
If none is found, it will attempt to retrieve it from
configs.default_env_vars(). If still unsuccessful, None is returned.
"""
try:
return os.environ[var]
except KeyError:
try:
return configs.default_env_vars()[var]
except KeyError:
return None



21 changes: 19 additions & 2 deletions src/marqo/tensor_search/validation.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import json
import pprint
import typing
from marqo.tensor_search import constants
from marqo.tensor_search import enums
from marqo.tensor_search import enums, utils
from typing import Iterable, Container
from marqo.errors import MarqoError, InvalidFieldNameError, InvalidArgError, InternalError, InvalidDocumentIdError
from marqo.errors import (
MarqoError, InvalidFieldNameError, InvalidArgError, InternalError,
InvalidDocumentIdError, DocTooLargeError)
from marqo.tensor_search.enums import TensorField
from marqo.tensor_search import constants
from typing import Any, Type
Expand Down Expand Up @@ -96,8 +99,22 @@ def validate_doc(doc: dict) -> dict:
"""
if not isinstance(doc, dict):
raise InvalidArgError("Docs must be dicts")

if len(doc) <= 0:
raise InvalidArgError("Can't index an empty dict.")

max_doc_size = utils.read_env_vars_and_defaults(var=enums.EnvVars.MARQO_MAX_DOC_BYTES)
if max_doc_size is not None:
try:
serialized = json.dumps(doc)
except TypeError as e:
raise InvalidArgError(f"Unable to index document: it is not serializable! Document: `{doc}` ")
if len(serialized) > int(max_doc_size):
maybe_id = f" _id:`{doc['_id']}`" if '_id' in doc else ''
raise DocTooLargeError(
f"Document{maybe_id} with length `{len(serialized)}` exceeds "
f"the allowed document size limit of [{max_doc_size}]."
)
return doc


Expand Down
1 change: 1 addition & 0 deletions tests/marqo_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from marqo.tensor_search.utils import construct_authorized_url
from marqo import config


class MarqoTestCase(unittest.TestCase):

@classmethod
Expand Down
6 changes: 0 additions & 6 deletions tests/s2_inference/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,6 @@

class TestOutputs(unittest.TestCase):

def setUp(self) -> None:

pass


def test_check_output(self):
# tests for checking the output type standardization
list_o_list = [[1,2]]
Expand All @@ -40,7 +35,6 @@ def test_create_model_cache_key(self):
for name in names:
for device in devices:
assert _create_model_cache_key(name, device) == (name, device)


def test_clear_model_cache(self):
# tests clearing the model cache
Expand Down
Loading