Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Timing Logging Enrichment #242

Merged
merged 8 commits into from
Dec 22, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 113 additions & 21 deletions src/marqo/tensor_search/tensor_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
"""
import copy
import datetime
from timeit import default_timer as timer
import functools
import pprint
import typing
Expand Down Expand Up @@ -296,17 +297,19 @@ def batch_requests(gathered, doc_tuple):
batched = functools.reduce(lambda x, y: batch_requests(x, y), deeper, [])

def verbosely_add_docs(i, docs):
t0 = datetime.datetime.now()
t0 = timer()

logger.info(f" batch {i}: beginning ingestion. ")
res = add_documents(
config=config, index_name=index_name,
docs=docs, auto_refresh=False, device=device,
update_mode=update_mode, non_tensor_fields=non_tensor_fields
)
total_batch_time = datetime.datetime.now() - t0
total_batch_time = timer() - t0
num_docs = len(docs)

logger.info(f" batch {i}: ingested {num_docs} docs. Time taken: {total_batch_time}. "
f"Average timer per doc {total_batch_time / num_docs}")
logger.info(f" batch {i}: ingested {num_docs} docs. Time taken: {(total_batch_time):.3f}. "
f"Average time per doc {(total_batch_time/num_docs):.3f}")
if verbose:
logger.info(f" results from indexing batch {i}: {res}")
return res
Expand Down Expand Up @@ -349,10 +352,13 @@ def add_documents(config: Config, index_name: str, docs: List[dict], auto_refres
Returns:

"""
# ADD DOCS TIMER-LOGGER (3)
start_time_3 = timer()

if non_tensor_fields is None:
non_tensor_fields = []

t0 = datetime.datetime.now()
t0 = timer()
bulk_parent_dicts = []

try:
Expand All @@ -375,6 +381,8 @@ def add_documents(config: Config, index_name: str, docs: List[dict], auto_refres
selected_device = config.indexing_device if device is None else device

unsuccessful_docs = []
total_vectorise_time = 0
batch_size = len(docs)

for i, doc in enumerate(docs):

Expand Down Expand Up @@ -477,9 +485,17 @@ def add_documents(config: Config, index_name: str, docs: List[dict], auto_refres
try:
# in the future, if we have different underlying vectorising methods, make sure we catch possible
# errors of different types generated here, too.

# ADD DOCS TIMER-LOGGER (4)
start_time = timer()
vector_chunks = s2_inference.vectorise(model_name=index_info.model_name, model_properties=_get_model_properties(index_info), content=content_chunks,
device=selected_device, normalize_embeddings=normalize_embeddings,
infer=infer_if_image)

end_time = timer()
single_vectorise_call = end_time - start_time
total_vectorise_time += single_vectorise_call
logger.debug(f"(4) TIME for single vectorise call: {(single_vectorise_call):.3f}s.")
except (s2_inference_errors.UnknownModelError,
s2_inference_errors.InvalidModelPropertiesError,
s2_inference_errors.ModelLoadError) as model_error:
Expand Down Expand Up @@ -581,23 +597,41 @@ def merged_doc = [:];
}
})

end_time_3 = timer()
total_preproc_time = end_time_3 - start_time_3
logger.info(f" add_documents pre-processing: took {(total_preproc_time):.3f}s total for {batch_size} docs, "
f"for an average of {(total_preproc_time / batch_size):.3f}s per doc.")

logger.info(f" add_documents vectorise: took {(total_vectorise_time):.3f}s for {batch_size} docs, "
f"for an average of {(total_vectorise_time / batch_size):.3f}s per doc.")

if bulk_parent_dicts:
# the HttpRequest wrapper handles error logic
update_mapping_response = backend.add_customer_field_properties(
config=config, index_name=index_name, customer_field_names=new_fields,
model_properties=_get_model_properties(index_info))


# ADD DOCS TIMER-LOGGER (5)
start_time_5 = timer()
index_parent_response = HttpRequests(config).post(
path="_bulk", body=utils.dicts_to_jsonl(bulk_parent_dicts))
end_time_5 = timer()
total_http_time = end_time_5 - start_time_5
total_index_time = index_parent_response["took"] * 0.001
logger.info(f" add_documents roundtrip: took {(total_http_time):.3f}s to send {batch_size} docs (roundtrip) to Marqo-os, "
f"for an average of {(total_http_time / batch_size):.3f}s per doc.")

logger.info(f" add_documents Marqo-os index: took {(total_index_time):.3f}s for Marqo-os to index {batch_size} docs, "
f"for an average of {(total_index_time / batch_size):.3f}s per doc.")
else:
index_parent_response = None

if auto_refresh:
refresh_response = HttpRequests(config).post(path=F"{index_name}/_refresh")

t1 = datetime.datetime.now()
t1 = timer()

def translate_add_doc_response(response: Optional[dict], time_diff: datetime.timedelta) -> dict:
def translate_add_doc_response(response: Optional[dict], time_diff: float) -> dict:
"""translates OpenSearch response dict into Marqo dict"""
item_fields_to_remove = ['_index', '_primary_term', '_seq_no', '_shards', '_version']
result_dict = {}
Expand All @@ -621,7 +655,7 @@ def translate_add_doc_response(response: Optional[dict], time_diff: datetime.tim
for loc, error_info in unsuccessful_docs:
new_items.insert(loc, error_info)

result_dict["processingTimeMs"] = time_diff.total_seconds() * 1000
result_dict["processingTimeMs"] = time_diff * 1000
result_dict["index_name"] = index_name
result_dict["items"] = new_items
return result_dict
Expand Down Expand Up @@ -696,6 +730,7 @@ def delete_documents(config: Config, index_name: str, doc_ids: List[str], auto_r
for _id in doc_ids:
validation.validate_id(_id)

# TODO: change to timer()
t0 = datetime.datetime.utcnow()
delete_res_backend = HttpRequests(config=config).post(
path=f"{index_name}/_delete_by_query", body={
Expand Down Expand Up @@ -765,7 +800,7 @@ def search(config: Config, index_name: str, text: str, result_count: int = 3, hi
explanation = upper_bound_explanation if max_docs_limit is not None else above_zero_explanation
raise errors.IllegalRequestedDocCount(f"{explanation} Marqo received search result limit of `{result_count}`.")

t0 = datetime.datetime.now()
t0 = timer()

if searchable_attributes is not None:
[validation.validate_field_name(attribute) for attribute in searchable_attributes]
Expand Down Expand Up @@ -801,27 +836,33 @@ def search(config: Config, index_name: str, text: str, result_count: int = 3, hi
else:
raise errors.InvalidArgError(f"Search called with unknown search method: {search_method}")

logger.info("reranking using {}".format(reranker))

if reranker is not None:
logger.info("reranking using {}".format(reranker))
if searchable_attributes is None:
raise errors.InvalidArgError(f"searchable_attributes cannot be None when re-ranking. Specify which fields to search and rerank over.")
try:
# SEARCH TIMER-LOGGER (reranking)
start_rerank_time = timer()
rerank.rerank_search_results(search_result=search_result, query=text,
model_name=reranker, device=config.indexing_device if device is None else device,
searchable_attributes=searchable_attributes, num_highlights=1 if simplified_format else num_highlights)
end_rerank_time = timer()
total_rerank_time = end_rerank_time - start_rerank_time
logger.info(f"search ({search_method.lower()}) reranking using {reranker}: took {(total_rerank_time):.3f}s to rerank results.")
except Exception as e:
raise errors.BadRequestError(f"reranking failure due to {str(e)}")

time_taken = datetime.datetime.now() - t0
search_result["processingTimeMs"] = round(time_taken.total_seconds() * 1000)

search_result["query"] = text
pandu-k marked this conversation as resolved.
Show resolved Hide resolved
search_result["limit"] = result_count

if not highlights:
for hit in search_result["hits"]:
del hit["_highlights"]

time_taken = timer() - t0
search_result["processingTimeMs"] = round(time_taken * 1000)
logger.info(f"search ({search_method.lower()}) completed with total processing time: {(time_taken):.3f}s.")

return search_result

Expand All @@ -847,6 +888,7 @@ def _lexical_search(
Notes:
Should not be directly called by client - the search() method should
be called. The search() method adds syncing
Uses normal search (not multiple search).
TODO:
- Test raise_for_searchable_attribute=False
"""
Expand All @@ -855,6 +897,8 @@ def _lexical_search(
f"Query arg must be of type str! text arg is of type {type(text)}. "
f"Query arg: {text}")

# SEARCH TIMER-LOGGER (pre-processing)
start_preprocess_time = timer()
if searchable_attributes is not None and searchable_attributes:
fields_to_search = searchable_attributes
else:
Expand All @@ -874,7 +918,7 @@ def _lexical_search(
],
}
},
"size": result_count,
"size": result_count
}
if filter_string is not None:
body["query"]["bool"]["filter"] = [{
Expand All @@ -886,15 +930,37 @@ def _lexical_search(
body["_source"] = dict()
if body["_source"] is not False:
body["_source"]["exclude"] = [f"*{TensorField.vector_prefix}*"]

end_preprocess_time = timer()
total_preprocess_time = end_preprocess_time - start_preprocess_time
logger.info(f"search (lexical) pre-processing: took {(total_preprocess_time):.3f}s to process query.")

# SEARCH TIMER-LOGGER (roundtrip)
start_search_http_time = timer()
search_res = HttpRequests(config).get(path=f"{index_name}/_search", body=body)

end_search_http_time = timer()
total_search_http_time = end_search_http_time - start_search_http_time
total_os_process_time = search_res["took"] * 0.001
num_results = len(search_res['hits']['hits'])
logger.info(f"search (lexical) roundtrip: took {(total_search_http_time):.3f}s to send search query (roundtrip) to Marqo-os and received {num_results} results.")
pandu-k marked this conversation as resolved.
Show resolved Hide resolved
logger.info(f" search (lexical) Marqo-os processing time: took {(total_os_process_time):.3f}s for Marqo-os to execute the search.")

# SEARCH TIMER-LOGGER (post-processing)
start_postprocess_time = timer()

res_list = []
for doc in search_res['hits']['hits']:
just_doc = _clean_doc(doc["_source"].copy()) if "_source" in doc else dict()
if return_doc_ids:
just_doc["_id"] = doc["_id"]
just_doc["_score"] = doc["_score"]
res_list.append({**just_doc, "_highlights": []})

end_postprocess_time = timer()
total_postprocess_time = end_postprocess_time - start_postprocess_time
logger.info(f"search (lexical) post-processing: took {(total_postprocess_time):.3f}s to format {len(res_list)} results.")

return {'hits': res_list}


Expand All @@ -921,7 +987,7 @@ def _vector_text_search(
Returns:

Note:
- looks for k results in each attribute. Not that much of a concern unless you have a
- uses multisearch, which returns k results in each attribute. Not that much of a concern unless you have a
ridiculous number of attributes
- Should not be directly called by client - the search() method should
be called. The search() method adds syncing
Expand All @@ -944,6 +1010,8 @@ def _vector_text_search(
"filtering not yet implemented for S2Search cloud!"
)

# SEARCH TIMER-LOGGER (pre-processing)
start_preprocess_time = timer()
try:
index_info = get_index_info(config=config, index_name=index_name)
except KeyError as e:
Expand Down Expand Up @@ -1044,14 +1112,30 @@ def _vector_text_search(
# empty body means that there are no vector fields associated with the index.
# This probably means the index is emtpy
return {"hits": []}

end_preprocess_time = timer()
total_preprocess_time = end_preprocess_time - start_preprocess_time
logger.info(f"search (tensor) pre-processing: took {(total_preprocess_time):.3f}s to vectorize and process query.")

# SEARCH TIMER-LOGGER (roundtrip)
start_search_http_time = timer()
response = HttpRequests(config).get(path=F"{index_name}/_msearch", body=utils.dicts_to_jsonl(body))

if verbose:
logger.info(f'Opensearch reported {response["took"]}ms search latency')

end_search_http_time = timer()
total_search_http_time = end_search_http_time - start_search_http_time
total_os_process_time = response["took"] * 0.001
num_responses = len(response["responses"])
logger.info(f"search (tensor) roundtrip: took {(total_search_http_time):.3f}s to send {num_responses} search queries (roundtrip) to Marqo-os.")

try:
responses = [r['hits']['hits'] for r in response["responses"]]

# SEARCH TIMER-LOGGER (Log number of results and time for each search in multisearch)
for i in range(len(vector_properties_to_search)):
indiv_responses = response["responses"][i]['hits']['hits']
indiv_query_time = response["responses"][i]["took"] * 0.001
logger.info(f" search (tensor) Marqo-os processing time (search field = {list(vector_properties_to_search)[i]}): took {(indiv_query_time):.3f}s and received {len(indiv_responses)} hits.")

except KeyError as e:
# KeyError indicates we have received a non-successful result
try:
Expand All @@ -1067,6 +1151,10 @@ def _vector_text_search(
except (KeyError, IndexError) as e2:
raise e

logger.info(f" search (tensor) Marqo-os processing time: took {(total_os_process_time):.3f}s for Marqo-os to execute the search.")

vicilliar marked this conversation as resolved.
Show resolved Hide resolved
# SEARCH TIMER-LOGGER (post-processing)
start_postprocess_time = timer()
gathered_docs = dict()

if verbose:
Expand Down Expand Up @@ -1154,10 +1242,14 @@ def format_ordered_docs_simple(ordered_docs_w_chunks: List[dict]) -> dict:
return {"hits": simple_results[:result_count]}

if simplified_format:
return format_ordered_docs_simple(ordered_docs_w_chunks=completely_sorted)
res = format_ordered_docs_simple(ordered_docs_w_chunks=completely_sorted)
else:
return format_ordered_docs_preserving(ordered_docs_w_chunks=completely_sorted, num_highlights=number_of_highlights)
res = format_ordered_docs_preserving(ordered_docs_w_chunks=completely_sorted, num_highlights=number_of_highlights)

end_postprocess_time = timer()
total_postprocess_time = end_postprocess_time - start_postprocess_time
logger.info(f"search (tensor) post-processing: took {(total_postprocess_time):.3f}s to sort and format {len(completely_sorted)} results from Marqo-os.")
return res

def check_health(config: Config):
TIMEOUT = 3
Expand Down