Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Marqo Document API Response Format #911

Merged
merged 46 commits into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
726da16
Patch the vespa client error handling logic
wanliAlex Jul 19, 2024
b088782
Fix unit tests
wanliAlex Jul 22, 2024
5a3a824
Update version to 2.10.2
wanliAlex Jul 22, 2024
ea6eecf
Fix tests
wanliAlex Jul 22, 2024
097d8e8
Farshid's comments
wanliAlex Jul 22, 2024
2549c1c
Farshid's comments
wanliAlex Jul 22, 2024
7c576c4
Raise vespa error when we received an unexpected response
wanliAlex Jul 22, 2024
eb455b3
Revert the batch document response changes
wanliAlex Jul 22, 2024
0b37f1e
Revert the batch document response changes
wanliAlex Jul 22, 2024
387770a
Merge branch 'mainline' into li/patch-batch
wanliAlex Jul 22, 2024
6d04edf
New version of PR
wanliAlex Jul 23, 2024
eaaa963
Fix unit tests
wanliAlex Jul 23, 2024
49f3dad
Fix unit tests
wanliAlex Jul 23, 2024
1b25f3f
Typo
wanliAlex Jul 23, 2024
e1e2c7f
Changes
wanliAlex Jul 24, 2024
fa03e6c
Fix pydantic models
wanliAlex Jul 24, 2024
a2bb0fb
Fix get documents
wanliAlex Jul 30, 2024
a0a34a6
return dict in tensor_search.add_documents
wanliAlex Jul 30, 2024
d164d34
Remove None issue for responses
wanliAlex Jul 30, 2024
6252858
Move count to pydantic classes
wanliAlex Jul 31, 2024
f6417de
Move count to pydantic models
wanliAlex Jul 31, 2024
0009280
Set version to 2.11.0
wanliAlex Jul 31, 2024
749cf2d
Merge branch 'mainline' into li/patch-batch
wanliAlex Jul 31, 2024
9937a52
Fix all the add_documents tests
wanliAlex Jul 31, 2024
bbfa928
Fix all tests
wanliAlex Jul 31, 2024
f58eb11
Fix get documents response
wanliAlex Jul 31, 2024
a747559
Fix get documents tests
wanliAlex Jul 31, 2024
bc3c797
Finish update documents tests
wanliAlex Jul 31, 2024
8151bf4
Update documents code
wanliAlex Jul 31, 2024
fcb9886
Fix some code logic
wanliAlex Jul 31, 2024
0000ef0
Fix a typo
wanliAlex Jul 31, 2024
af4508d
Still return 400 for special characters
wanliAlex Aug 1, 2024
133c8c5
Use self.assertIn in the tests
wanliAlex Aug 1, 2024
e2d598d
Revise upon Yihan's comments
wanliAlex Aug 1, 2024
b9d9d48
Fix typo
wanliAlex Aug 1, 2024
941507d
Fix tests
wanliAlex Aug 1, 2024
c9b21b8
Fix tests
wanliAlex Aug 1, 2024
40c0fa6
Fix tests
wanliAlex Aug 1, 2024
14e6c2c
Fix tests
wanliAlex Aug 1, 2024
c29e43d
Add a logger when returning 500 to users in document APIs
wanliAlex Aug 1, 2024
15034ea
Add more tests
wanliAlex Aug 2, 2024
60d33fd
Merge branch 'mainline' into li/patch-batch
wanliAlex Aug 2, 2024
c601f31
Fix typo
wanliAlex Aug 2, 2024
3fc7c94
Fix typo
wanliAlex Aug 2, 2024
23ed8d7
Add back the resp.text
wanliAlex Aug 2, 2024
c813656
Merge branch 'mainline' into li/patch-batch
farshidz Aug 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 79 additions & 11 deletions src/marqo/core/document/document.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
from timeit import default_timer as timer
from typing import Dict, List, Tuple
from typing import Dict, List, Tuple, Optional

import marqo.api.exceptions as api_exceptions
from marqo.core.constants import MARQO_DOC_ID
from marqo.core.exceptions import UnsupportedFeatureError, ParsingError
from marqo.core.index_management.index_management import IndexManagement
from marqo.core.models.marqo_add_documents_response import MarqoAddDocumentsResponse, MarqoAddDocumentsItem
from marqo.core.models.marqo_index import IndexType
from marqo.core.models.marqo_update_documents_response import MarqoUpdateDocumentsResponse, MarqoUpdateDocumentsItem
from marqo.core.vespa_index import for_marqo_index as vespa_index_factory
from marqo.logging import get_logger
from marqo.vespa.models import UpdateDocumentsBatchResponse, VespaDocument
from marqo.vespa.models.delete_document_response import DeleteAllDocumentsResponse
from marqo.vespa.models.feed_response import FeedBatchResponse
from marqo.vespa.vespa_client import VespaClient
from marqo.core.constants import MARQO_DOC_ID
from marqo.logging import get_logger

logger = get_logger(__name__)

Expand Down Expand Up @@ -127,20 +129,22 @@ def _translate_update_document_response(self, responses: UpdateDocumentsBatchRes
MarqoUpdateDocumentsResponse containing the response of the partial update operation
"""

new_items: List[MarqoUpdateDocumentsItem] = []
items: List[MarqoUpdateDocumentsItem] = []

errors = responses.errors

if responses is not None:
for resp in responses.responses:
doc_id = resp.id.split('::')[-1] if resp.id else None
item = MarqoUpdateDocumentsItem(
id=doc_id, status=resp.status, message=resp.message
)
new_items.append(item)
status, message = self.translate_vespa_document_response(resp.status)
new_item = MarqoUpdateDocumentsItem(id=doc_id, status=status, message=message, error=message)
items.append(new_item)

for loc, error_info in unsuccessful_docs:
new_items.insert(loc, error_info)
items.insert(loc, error_info)
errors = True

return MarqoUpdateDocumentsResponse(index_name=index_name, items=new_items,
return MarqoUpdateDocumentsResponse(errors=errors, index_name=index_name, items=items,
processingTimeMs=(timer() - start_time) * 1000)

def remove_duplicated_documents(self, documents: List) -> Tuple[List, set]:
Expand Down Expand Up @@ -170,4 +174,68 @@ def remove_duplicated_documents(self, documents: List) -> Tuple[List, set]:
docs.append(doc)
# Reverse to preserve order in request
docs.reverse()
return docs, doc_ids
return docs, doc_ids

def translate_add_documents_response(self, responses: Optional[FeedBatchResponse],
index_name: str,
unsuccessful_docs: List,
add_docs_processing_time: float) \
-> MarqoAddDocumentsResponse:
"""Translate Vespa FeedBatchResponse into MarqoAddDocumentsResponse.

Args:
responses: The response from Vespa
index_name: The name of the index
unsuccessful_docs: The list of unsuccessful documents
add_docs_processing_time: The processing time of the add documents operation, in seconds

Return:
MarqoAddDocumentsResponse: The response of the add documents operation
"""

new_items: List[MarqoAddDocumentsItem] = []
# A None response means no documents are sent to Vespa. Probably all documents are invalid and blocked in Marqo.
errors = responses.errors if responses is not None else True

if responses is not None:
for resp in responses.responses:
doc_id = resp.id.split('::')[-1] if resp.id else None
status, message = self.translate_vespa_document_response(resp.status, message=resp.message)
new_item = MarqoAddDocumentsItem(id=doc_id, status=status, message=message)
new_items.append(new_item)

for loc, error_info in unsuccessful_docs:
new_items.insert(loc, error_info)
errors = True

return MarqoAddDocumentsResponse(errors=errors, index_name=index_name, items=new_items,
processingTimeMs=add_docs_processing_time)

def translate_vespa_document_response(self, status: int, message: Optional[str]=None) -> Tuple[int, Optional[str]]:
"""A helper function to translate Vespa document response into the expected status, message that
is used in Marqo document API responses.

Args:
status: The status code from Vespa document response

Return:
A tuple of status code and the message in the response
"""
if status == 200:
return 200, None
elif status == 404:
return 404, "Document does not exist in the index"
# Update documents get 412 from Vespa for document not found as we use condition
elif status == 412:
return 404, "Document does not exist in the index"
elif status == 429:
return 429, "Marqo vector store receives too many requests. Please try again later"
elif status == 507:
return 400, "Marqo vector store is out of memory or disk space"
# TODO Block the invalid special characters before sending to Vespa
elif status == 400 and isinstance(message, str) and "could not parse field" in message:
return 400, f"The document contains invalid characters in the fields. Original error: {message} "
else:
logger.error(f"An unexpected error occurred from the Vespa document response. "
f"status: {status}, message: {message}")
return 500, f"Marqo vector store returns an unexpected error with this document. Original error: {message}"
62 changes: 62 additions & 0 deletions src/marqo/core/models/marqo_add_documents_response.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from typing import List, Optional, Any, Dict

from pydantic import Field, root_validator

from marqo.base_model import MarqoBaseModel


class MarqoAddDocumentsItem(MarqoBaseModel):
"""A response from adding a document to Marqo.

This model takes the response from Marqo vector store and translate it to a user-friendly response.
"""
status: int
# This id can be any type as it might be used to hold an invalid id response
id: Any = Field(alias="_id", default=None)
message: Optional[str] = None
error: Optional[str] = None
code: Optional[str] = None


class BatchResponseStats(MarqoBaseModel):
success_count: int = Field(default=0)
error_count: int = Field(default=0)
failure_count: int = Field(default=0)

def get_header_dict(self) -> Dict[str, str]:
return {
"x-count-success": str(self.success_count),
"x-count-failure": str(self.failure_count),
"x-count-error": str(self.error_count),
}


class MarqoAddDocumentsResponse(MarqoBaseModel):
errors: bool
processingTimeMs: float
index_name: str # TODO Change this to camelCase in the future (Breaking change!)
items: List[MarqoAddDocumentsItem]

_batch_response_stats: BatchResponseStats = Field(exclude=True, default_factory=BatchResponseStats)

@root_validator(pre=False, skip_on_failure=True)
def count_items(cls, values):
items = values.get("items")
batch_response_count = BatchResponseStats()

if items:
for item in items:
if item.status in range(200, 300):
batch_response_count.success_count += 1
elif item.status in range(400, 500):
batch_response_count.failure_count += 1
elif item.status >= 500:
batch_response_count.error_count += 1
else:
raise ValueError(f"Unexpected status code: {item.status}")

values['_batch_response_stats'] = batch_response_count
return values

def get_header_dict(self) -> Dict[str, str]:
return self._batch_response_stats.get_header_dict()
58 changes: 58 additions & 0 deletions src/marqo/core/models/marqo_get_documents_by_id_response.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from typing import Optional, Union, List, Dict, Any

from pydantic import Field, root_validator

from marqo.base_model import MarqoBaseModel
from marqo.tensor_search.enums import TensorField
from marqo.core.models.marqo_add_documents_response import BatchResponseStats


class MarqoGetDocumentsByIdsItem(MarqoBaseModel):
"""A pydantic model for item in MarqoGetDocumentsByIdsResponse.results.

Only invalid request errors are handled here.
Valid request should return a dictionary containing the document.
"""
# This id can be any type as it might be used to hold an invalid id response
id: Any = Field(alias="_id", default=None)
status: int
message: Optional[str] = None
found: Optional[bool] = Field(alias=str(TensorField.found), default=None)


class MarqoGetDocumentsByIdsResponse(MarqoBaseModel):
"""
A response from getting documents by their ids from Marqo.
"""
errors: bool
results: List[Union[MarqoGetDocumentsByIdsItem, Dict]] = []

_batch_response_stats: BatchResponseStats = Field(exclude=True, default_factory=BatchResponseStats)

@root_validator(pre=False, skip_on_failure=True)
def count_items(cls, values):
results = values.get("results")
batch_response_count = BatchResponseStats()

if results:
for item in results:
if isinstance(item, dict):
# a dictionary indicate a successful response
batch_response_count.success_count += 1
elif isinstance(item, MarqoGetDocumentsByIdsItem):
if item.status in range(200, 300):
batch_response_count.success_count += 1
elif item.status in range(400, 500):
batch_response_count.failure_count += 1
elif item.status >= 500:
batch_response_count.error_count += 1
else:
raise ValueError(f"Unexpected status code: {item.status}")
else:
raise ValueError(f"Unexpected item type: {type(item)}")

values['_batch_response_stats'] = batch_response_count
return values

def get_header_dict(self) -> Dict[str, str]:
return self._batch_response_stats.get_header_dict()
46 changes: 26 additions & 20 deletions src/marqo/core/models/marqo_update_documents_response.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,44 @@
from typing import List, Optional
from typing import List, Optional, Dict

from pydantic import Field, root_validator

from marqo.base_model import MarqoBaseModel
from marqo.core.models.marqo_add_documents_response import BatchResponseStats


class MarqoUpdateDocumentsItem(MarqoBaseModel):
id: str = Field(alias="_id")
id: Optional[str] = Field(alias="_id", default=None)
status: int
message: Optional[str] = None
error: Optional[str] = None

@root_validator(pre=True)
def check_status_and_message(cls, values):
status = values.get('status')
message = values.get('message')
if status >= 400 and message:
values["error"] = message
return values


class MarqoUpdateDocumentsResponse(MarqoBaseModel):
errors: bool
index_name: str
items: List[MarqoUpdateDocumentsItem]
processingTimeMs: float

@root_validator(pre=True)
def check_errors(cls, values):
items = values.get('items')
errors = False
for item in items:
if item.error:
errors = True
break
values["errors"] = errors
return values
_batch_response_stats: BatchResponseStats = Field(exclude=True, default_factory=BatchResponseStats)

@root_validator(pre=False, skip_on_failure=True)
def count_items(cls, values):
items = values.get("items")
batch_response_count = BatchResponseStats()

if items:
for item in items:
if item.status in range(200, 300):
batch_response_count.success_count += 1
elif item.status in range(400, 500):
batch_response_count.failure_count += 1
elif item.status >= 500:
batch_response_count.error_count += 1
else:
raise ValueError(f"Unexpected status code: {item.status}")

values['_batch_response_stats'] = batch_response_count
return values

def get_header_dict(self) -> Dict[str, str]:
return self._batch_response_stats.get_header_dict()
2 changes: 1 addition & 1 deletion src/marqo/core/search/recommender.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def recommend(self,
marqo_documents = tensor_search.get_documents_by_ids(
config.Config(self.vespa_client),
index_name, document_ids, show_vectors=True
)
).dict(exclude_none=True, by_alias=True)

# Make sure all documents were found
not_found = []
Expand Down
10 changes: 6 additions & 4 deletions src/marqo/tensor_search/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import pydantic
import uvicorn
from fastapi import Depends, FastAPI, Request
from fastapi import Depends, FastAPI, Request, Response
from fastapi.encoders import jsonable_encoder
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse
Expand Down Expand Up @@ -317,9 +317,10 @@ def add_or_replace_documents(
device=device)

with RequestMetricsStore.for_request().time(f"POST /indexes/{index_name}/documents"):
return tensor_search.add_documents(
res = tensor_search.add_documents(
config=marqo_config, add_docs_params=add_docs_params
)
return JSONResponse(content=res.dict(exclude_none=True, by_alias=True), headers=res.get_header_dict())


@app.post("/indexes/{index_name}/embed")
Expand Down Expand Up @@ -347,7 +348,7 @@ def update_documents(
res = marqo_config.document.partial_update_documents_by_index_name(
index_name=index_name, partial_documents=body.documents)

return res.dict(exclude_none=True, by_alias=True)
return JSONResponse(content=res.dict(exclude_none=True, by_alias=True), headers=res.get_header_dict())


@app.get("/indexes/{index_name}/documents/{document_id}")
Expand All @@ -365,10 +366,11 @@ def get_documents_by_ids(
index_name: str, document_ids: List[str],
marqo_config: config.Config = Depends(get_config),
expose_facets: bool = False):
return tensor_search.get_documents_by_ids(
res = tensor_search.get_documents_by_ids(
config=marqo_config, index_name=index_name, document_ids=document_ids,
show_vectors=expose_facets
)
return JSONResponse(content=res.dict(exclude_none=True, by_alias=True), headers=res.get_header_dict())


@app.get("/indexes/{index_name}/stats")
Expand Down
Loading
Loading