Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return metrics #506

Merged
merged 17 commits into from
Jun 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion src/marqo/s2_inference/clip_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from marqo.s2_inference.processing.custom_clip_utils import HFTokenizer, download_model
from torchvision.transforms import InterpolationMode
from marqo.s2_inference.configs import ModelCache
from marqo.tensor_search.telemetry import RequestMetrics, RequestMetricsStore

logger = get_logger(__name__)

Expand Down Expand Up @@ -79,7 +80,7 @@ def format_and_load_CLIP_images(images: List[Union[str, ndarray, ImageType]], im
return results


def load_image_from_path(image_path: str, image_download_headers: dict, timeout=3) -> ImageType:
def load_image_from_path(image_path: str, image_download_headers: dict, timeout=3, metrics_obj: Optional[RequestMetrics] = None) -> ImageType:
"""Loads an image into PIL from a string path that is either local or a url

Args:
Expand All @@ -97,7 +98,13 @@ def load_image_from_path(image_path: str, image_download_headers: dict, timeout=
img = Image.open(image_path)
elif validators.url(image_path):
try:
if metrics_obj is not None:
metrics_obj.start(f"image_download.{image_path}")

resp = requests.get(image_path, stream=True, timeout=timeout, headers=image_download_headers)

if metrics_obj is not None:
metrics_obj.stop(f"image_download.{image_path}")
except (requests.exceptions.ConnectTimeout, requests.exceptions.ConnectionError,
requests.exceptions.RequestException
) as e:
Expand Down
1 change: 0 additions & 1 deletion src/marqo/s2_inference/hf_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ def encode(self, sentence: Union[str, List[str]], normalize=True, **kwargs) -> U
self.load()

self.model.normalize = normalize

inputs = self.tokenizer(sentence, padding=True, truncation=True, max_length=self.max_seq_length,
return_tensors="pt").to(self.device)

Expand Down
115 changes: 86 additions & 29 deletions src/marqo/tensor_search/add_docs.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
"""Functions used to fulfill the add_documents endpoint"""
import copy
import functools
import math
import threading
import warnings
from typing import List, Tuple
import random

from typing import List, Optional, Tuple
import PIL
from marqo.s2_inference import clip_utils
from marqo.tensor_search.telemetry import RequestMetricsStore, RequestMetrics


def threaded_download_images(allocated_docs: List[dict], image_repo: dict,
non_tensor_fields: Tuple, image_download_headers: dict) -> None:
non_tensor_fields: Tuple, image_download_headers: dict, metric_obj: Optional[RequestMetrics] = None) -> None:
"""A thread calls this function to download images for its allocated documents

This should be called only if treat URLs as images is True.
Expand All @@ -35,31 +36,45 @@ def threaded_download_images(allocated_docs: List[dict], image_repo: dict,
Returns:
None
"""
# Generate pseudo-unique ID for thread metrics.
_id = hash("".join([d.get("_id", str(random.getrandbits(64))) for d in allocated_docs])) % 1000
_id = f"image_download.{_id}"
TIMEOUT_SECONDS=3
for doc in allocated_docs:
for field in list(doc):
if field in non_tensor_fields:
continue
if isinstance(doc[field], str) and clip_utils._is_image(doc[field]):
if doc[field] in image_repo:
continue
try:
image_repo[doc[field]] = clip_utils.load_image_from_path(doc[field], image_download_headers, timeout=TIMEOUT_SECONDS)
except PIL.UnidentifiedImageError as e:
image_repo[doc[field]] = e
if metric_obj is None: # Occurs predominately in testing.
metric_obj = RequestMetricsStore.for_request()
RequestMetricsStore.set_in_request(metrics=metric_obj)

with metric_obj.time(f"{_id}.thread_time"):
for doc in allocated_docs:
for field in list(doc):
if field in non_tensor_fields:
continue
# For multimodal tensor combination
elif isinstance(doc[field], dict):
for sub_field in list(doc[field].values()):
if isinstance(sub_field, str) and clip_utils._is_image(sub_field):
if sub_field in image_repo:
continue
try:
image_repo[sub_field] = clip_utils.load_image_from_path(sub_field, image_download_headers,
timeout=TIMEOUT_SECONDS)
except PIL.UnidentifiedImageError as e:
image_repo[sub_field] = e
continue
if isinstance(doc[field], str) and clip_utils._is_image(doc[field]):
if doc[field] in image_repo:
continue
try:
image_repo[doc[field]] = clip_utils.load_image_from_path(doc[field], image_download_headers, timeout=TIMEOUT_SECONDS, metrics_obj=metric_obj)
except PIL.UnidentifiedImageError as e:
image_repo[doc[field]] = e
metric_obj.increment_counter(f"{doc.get(field, '')}.UnidentifiedImageError")
continue
# For multimodal tensor combination
elif isinstance(doc[field], dict):
for sub_field in list(doc[field].values()):
if isinstance(sub_field, str) and clip_utils._is_image(sub_field):
if sub_field in image_repo:
continue
try:
image_repo[sub_field] = clip_utils.load_image_from_path(
sub_field,
image_download_headers,
timeout=TIMEOUT_SECONDS,
metrics_obj=metric_obj
)
except PIL.UnidentifiedImageError as e:
image_repo[sub_field] = e
metric_obj.increment_counter(f"{doc.get(field, '')}.UnidentifiedImageError")
continue


def download_images(docs: List[dict], thread_count: int, non_tensor_fields: Tuple, image_download_headers: dict) -> dict:
Expand All @@ -79,13 +94,55 @@ def download_images(docs: List[dict], thread_count: int, non_tensor_fields: Tupl
docs_per_thread = math.ceil(len(docs)/thread_count)
copied = copy.deepcopy(docs)
image_repo = dict()

m = [RequestMetrics() for i in range(thread_count)]
thread_allocated_docs = [copied[i: i + docs_per_thread] for i in range(len(copied))[::docs_per_thread]]
threads = [threading.Thread(target=threaded_download_images, args=(allocation, image_repo, non_tensor_fields, image_download_headers))
for allocation in thread_allocated_docs]
threads = [threading.Thread(target=threaded_download_images, args=(allocation, image_repo, non_tensor_fields, image_download_headers, m[i]))
for i, allocation in enumerate(thread_allocated_docs)]

for th in threads:
th.start()

for th in threads:
th.join()

# Fix up metric_obj to make it not mention thread-ids
metric_obj = RequestMetricsStore.for_request()
metric_obj = RequestMetrics.reduce_from_list([metric_obj] + m)
metric_obj.times = reduce_thread_metrics(metric_obj.times)
return image_repo

def reduce_thread_metrics(data):
"""Reduce the metrics from each thread, as if they were run in a single thread.

e.g.
```
{
"image_download.700.thread_time": 1373.271582997404,
"image_download.700.https://www.ai-nc.com/images/pages/heat-map.png": 52.985392,
"image_download.729.thread_time": 53.297404,
"image_download.729.https://www.ai-nc.com/images/pages/heat-map.png": 2052.617332985392,
}
```
Becomes
```
{
"image_download.thread_time": [1373.271582997404, 53.297404],
"image_download.https://www.ai-nc.com/images/pages/heat-map.png": [2052.617332985392, 52.985392],
}
```
Only applies to times that start with `image_download`.
"""
result = {}
for key, value in data.items():
if key.startswith("image_download."):
parts = key.split('.')
new_key = '.'.join(parts[0:1] + parts[2:]) if parts[1] != 'full_time' else key
if new_key in result:
if isinstance(result[new_key], list):
result[new_key].append(value)
else:
result[new_key] = [result[new_key], value]
else:
result[new_key] = value
return result
46 changes: 27 additions & 19 deletions src/marqo/tensor_search/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
from marqo.tensor_search.utils import add_timing
import pydantic

from marqo.tensor_search.telemetry import RequestMetricsStore, TelemetryMiddleware


def replace_host_localhosts(OPENSEARCH_IS_INTERNAL: str, OS_URL: str):
"""Replaces a host's localhost URL with one that can be referenced from
Expand Down Expand Up @@ -56,6 +58,7 @@ def replace_host_localhosts(OPENSEARCH_IS_INTERNAL: str, OS_URL: str):
title="Marqo",
version=version.get_version()
)
app.add_middleware(TelemetryMiddleware)


def generate_config() -> config.Config:
Expand Down Expand Up @@ -136,26 +139,30 @@ def create_index(index_name: str, settings: Dict = None, marqo_config: config.Co
@throttle(RequestType.SEARCH)
@add_timing
def bulk_search(query: BulkSearchQuery, device: str = Depends(api_validation.validate_device), marqo_config: config.Config = Depends(generate_config)):
return tensor_search.bulk_search(query, marqo_config, device=device)
with RequestMetricsStore.for_request().time(f"POST /indexes/bulk/search"):
return tensor_search.bulk_search(query, marqo_config, device=device)

@app.post("/indexes/{index_name}/search")
@throttle(RequestType.SEARCH)
def search(search_query: SearchQuery, index_name: str, device: str = Depends(api_validation.validate_device),
marqo_config: config.Config = Depends(generate_config)):
return tensor_search.search(
config=marqo_config, text=search_query.q,
index_name=index_name, highlights=search_query.showHighlights,
searchable_attributes=search_query.searchableAttributes,
search_method=search_query.searchMethod,
result_count=search_query.limit, offset=search_query.offset,
reranker=search_query.reRanker,
filter=search_query.filter, device=device,
attributes_to_retrieve=search_query.attributesToRetrieve, boost=search_query.boost,
image_download_headers=search_query.image_download_headers,
context=search_query.context,
score_modifiers=search_query.scoreModifiers,
model_auth=search_query.modelAuth
)

with RequestMetricsStore.for_request().time(f"POST /indexes/{index_name}/search"):
return tensor_search.search(
config=marqo_config, text=search_query.q,
index_name=index_name, highlights=search_query.showHighlights,
searchable_attributes=search_query.searchableAttributes,
search_method=search_query.searchMethod,
result_count=search_query.limit, offset=search_query.offset,
reranker=search_query.reRanker,
filter=search_query.filter, device=device,
attributes_to_retrieve=search_query.attributesToRetrieve, boost=search_query.boost,
image_download_headers=search_query.image_download_headers,
context=search_query.context,
score_modifiers=search_query.scoreModifiers,
model_auth=search_query.modelAuth
)



@app.post("/indexes/{index_name}/documents")
Expand Down Expand Up @@ -184,10 +191,11 @@ def add_or_replace_documents(
use_existing_tensors=use_existing_tensors, image_download_headers=image_download_headers,
mappings=mappings, model_auth=model_auth
)
return tensor_search.add_documents_orchestrator(
config=marqo_config, add_docs_params=add_docs_params,
batch_size=batch_size, processes=processes
)
with RequestMetricsStore.for_request().time(f"POST /indexes/{index_name}/documents"):
return tensor_search.add_documents_orchestrator(
config=marqo_config, add_docs_params=add_docs_params,
batch_size=batch_size, processes=processes
)


@app.put("/indexes/{index_name}/documents")
Expand Down
42 changes: 34 additions & 8 deletions src/marqo/tensor_search/parallel.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import os
import random
import time
import json
from typing import List, Dict, Optional
from typing import Any, List, Dict, Optional, Tuple, Union
import copy
from fastapi import Request
import torch
import numpy as np
from torch import multiprocessing as mp
Expand All @@ -12,6 +14,7 @@
from marqo.tensor_search.models.add_docs_objects import AddDocsParams
from dataclasses import replace
from marqo.config import Config
from marqo.tensor_search.telemetry import RequestMetrics, RequestMetricsStore, Timer


try:
Expand Down Expand Up @@ -84,6 +87,7 @@ def get_device_ids(n_processes: int, device: str):

raise ValueError(f"expected on of 'cpu', 'cuda' or 'cuda:#' but received {device}")


class IndexChunk:

"""wrapper to pass through documents to be indexed to multiprocessing
Expand All @@ -93,10 +97,13 @@ def __init__(
self,
add_docs_params: AddDocsParams,
config: Config,
request_metric: RequestMetrics,
batch_size: int = 50,
process_id: int = 0,
threads_per_process: int = None):

threads_per_process: int = None,
):
self.request_metric = request_metric

self.config = copy.deepcopy(config)
self.add_docs_params = add_docs_params
self.n_batch = batch_size
Expand All @@ -106,7 +113,15 @@ def __init__(
self.config.indexing_device = add_docs_params.device if add_docs_params.device is not None else self.config.indexing_device
self.threads_per_process = threads_per_process

def process(self):
def process(self) -> Tuple[List[Dict[str, Any]], RequestMetrics]:
# Generate pseudo-unique ID for thread metrics.
_id = hash("".join([d.get("_id", str(random.getrandbits(64))) for d in self.add_docs_params.docs])) % 1000

# We set a temporary key in the thread to allow it to reference `self.request_metric`
RequestMetricsStore.set_in_request(
r=Request(scope={"type": "http", "unique_metrics_key": f"IndexChunk.{_id}"}),
metrics=self.request_metric
)

# hf tokenizers setting
os.environ['TOKENIZERS_PARALLELISM'] = 'false'
Expand Down Expand Up @@ -142,15 +157,15 @@ def process(self):

end = time.time()
logger.info(f'took {end - start} sec for {self.n_docs} documents')
return results
return (results, self.request_metric)

@staticmethod
def _calculate_percent_done(current_step, total_steps, rounding=0):
percent = current_step/max(1e-9,total_steps)
return round(100*percent, rounding)


def _run_chunker(chunker: IndexChunk):
def _run_chunker(chunker: IndexChunk) -> Tuple[List[Dict[str, Any]], RequestMetrics]:
"""helper function to run the multiprocess by activating the chunker
Args:
chunker (IndexChunk): _description_
Expand Down Expand Up @@ -202,16 +217,27 @@ def add_documents_mp(
device_ids = get_device_ids(n_processes, selected_device)

start = time.time()
initial_metrics = RequestMetricsStore.for_request()
request = RequestMetricsStore._get_request()

chunkers = [
IndexChunk(
request_metric=initial_metrics,
config=config, batch_size=batch_size,
process_id=p_id, threads_per_process=threads_per_process,
add_docs_params=replace(add_docs_params, docs=_docs, device=device_ids[p_id]))
add_docs_params=replace(add_docs_params, docs=_docs, device=device_ids[p_id])
)
for p_id,_docs in enumerate(np.array_split(add_docs_params.docs, n_processes))]
logger.info(f'Performing parallel now across devices {device_ids}...')

with mp.Pool(n_processes) as pool:
results = pool.map(_run_chunker, chunkers)
results: List[Tuple[List[Dict[str, Any]], RequestMetrics]] = pool.map(_run_chunker, chunkers)

metrics: List[RequestMetrics] = [r[1] for r in results]
results: List[Tuple[List[Dict[str, Any]]]] = [r[0] for r in results]

RequestMetricsStore.set_in_request(r=request, metrics=RequestMetrics.reduce_from_list(metrics))

end = time.time()
logger.info(f"finished indexing all documents. took {end - start} seconds to index {n_documents} documents")
return results
Loading