Skip to content

Commit

Permalink
python[minor]: update evaluate to be concurrent (#1345)
Browse files Browse the repository at this point in the history
Co-authored-by: Bagatur <baskaryan@gmail.com>
  • Loading branch information
isahers1 and baskaryan authored Jan 21, 2025
1 parent 4f247f9 commit b812149
Show file tree
Hide file tree
Showing 5 changed files with 400 additions and 114 deletions.
249 changes: 217 additions & 32 deletions python/langsmith/evaluation/_arunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import asyncio
import concurrent.futures as cf
import datetime
import io
import logging
import pathlib
import uuid
Expand Down Expand Up @@ -47,7 +48,6 @@
_load_experiment,
_load_tqdm,
_load_traces,
_make_fresh_examples,
_resolve_data,
_resolve_evaluators,
_resolve_experiment,
Expand Down Expand Up @@ -480,7 +480,13 @@ async def _aevaluate(
num_repetitions=num_repetitions,
runs=runs,
include_attachments=_include_attachments(target)
or _evaluators_include_attachments(evaluators),
or _evaluators_include_attachments(evaluators) > 0,
reuse_attachments=num_repetitions
* (
int(_include_attachments(target))
+ _evaluators_include_attachments(evaluators)
)
> 1,
upload_results=upload_results,
).astart()
cache_dir = ls_utils.get_cache_dir(None)
Expand All @@ -491,15 +497,24 @@ async def _aevaluate(
cache_path = None
with ls_utils.with_optional_cache(cache_path, ignore_hosts=[client.api_url]):
if is_async_target:
manager = await manager.awith_predictions(
cast(ATARGET_T, target), max_concurrency=max_concurrency
)
if evaluators:
manager = await manager.awith_evaluators(
evaluators, max_concurrency=max_concurrency
)
if summary_evaluators:
manager = await manager.awith_summary_evaluators(summary_evaluators)
if evaluators:
# Run predictions and evaluations in a single pipeline
manager = await manager.awith_predictions_and_evaluators(
cast(ATARGET_T, target), evaluators, max_concurrency=max_concurrency
)
else:
manager = await manager.awith_predictions(
cast(ATARGET_T, target), max_concurrency=max_concurrency
)
if summary_evaluators:
manager = await manager.awith_summary_evaluators(summary_evaluators)
else:
if evaluators:
manager = await manager.awith_evaluators(
evaluators, max_concurrency=max_concurrency
)
if summary_evaluators:
manager = await manager.awith_summary_evaluators(summary_evaluators)
results = AsyncExperimentResults(manager)
if blocking:
await results.wait()
Expand Down Expand Up @@ -528,6 +543,18 @@ class _AsyncExperimentManager(_ExperimentManagerMixin):
sresults for the experiment.
summary_results (Optional[Iterable[EvaluationResults]]): The aggregate results
for the experiment.
num_repetitions (Optional[int], default=1): The number of repetitions for
the experiment.
include_attachments (Optional[bool], default=False): Whether to include
attachments. This is used for when we pull the examples for the experiment.
reuse_attachments (Optional[bool], default=False): Whether to reuse attachments
from examples. This is True if we need to reuse attachments across multiple
target/evaluator functions.
upload_results (Optional[bool], default=True): Whether to upload results
to Langsmith.
attachment_raw_data_dict (Optional[dict]): A dictionary to store raw data
for attachments. Only used if we reuse attachments across multiple
target/evaluator functions.
"""

def __init__(
Expand All @@ -543,7 +570,9 @@ def __init__(
description: Optional[str] = None,
num_repetitions: int = 1,
include_attachments: bool = False,
reuse_attachments: bool = False,
upload_results: bool = True,
attachment_raw_data_dict: Optional[dict] = None,
):
super().__init__(
experiment=experiment,
Expand All @@ -560,7 +589,54 @@ def __init__(
self._summary_results = summary_results
self._num_repetitions = num_repetitions
self._include_attachments = include_attachments
self._reuse_attachments = reuse_attachments
self._upload_results = upload_results
self._attachment_raw_data_dict = attachment_raw_data_dict

def _reset_example_attachments(self, example: schemas.Example) -> schemas.Example:
"""Reset attachment readers for an example.
This is only in the case that an attachment is going to be used by more
than 1 callable (target + evaluators). In that case we keep a single copy
of the attachment data in self._attachment_raw_data_dict, and create
readers from that data. This makes it so that we don't have to keep
copies of the same data in memory, instead we can just create readers
from the same data.
"""
if not hasattr(example, "attachments") or not example.attachments:
return example

new_attachments: dict[str, schemas.AttachmentInfo] = {}
for name, attachment in example.attachments.items():
if (
self._attachment_raw_data_dict is not None
and str(example.id) + name in self._attachment_raw_data_dict
):
new_attachments[name] = {
"presigned_url": attachment["presigned_url"],
"reader": io.BytesIO(
self._attachment_raw_data_dict[str(example.id) + name]
),
"mime_type": attachment["mime_type"],
}
else:
new_attachments[name] = attachment

# Create a new Example instance with the updated attachments
return schemas.Example(
id=example.id,
created_at=example.created_at,
dataset_id=example.dataset_id,
inputs=example.inputs,
outputs=example.outputs,
metadata=example.metadata,
modified_at=example.modified_at,
runs=example.runs,
source_run_id=example.source_run_id,
attachments=new_attachments,
_host_url=example._host_url,
_tenant_id=example._tenant_id,
)

async def aget_examples(self) -> AsyncIterator[schemas.Example]:
if self._examples is None:
Expand All @@ -569,11 +645,23 @@ async def aget_examples(self) -> AsyncIterator[schemas.Example]:
client=self.client,
include_attachments=self._include_attachments,
)
if self._reuse_attachments and self._attachment_raw_data_dict is None:
examples_copy, self._examples = aitertools.atee(self._examples)
self._attachment_raw_data_dict = {
str(e.id) + name: value["reader"].read()
async for e in examples_copy
for name, value in (e.attachments or {}).items()
}
if self._num_repetitions > 1:
examples_list = [example async for example in self._examples]
self._examples = async_chain_from_iterable(
[
async_iter_from_list(_make_fresh_examples(examples_list))
async_iter_from_list(
[
self._reset_example_attachments(example)
for example in examples_list
]
)
for _ in range(self._num_repetitions)
]
)
Expand Down Expand Up @@ -639,6 +727,104 @@ async def astart(self) -> _AsyncExperimentManager:
runs=self._runs,
evaluation_results=self._evaluation_results,
include_attachments=self._include_attachments,
reuse_attachments=self._reuse_attachments,
upload_results=self._upload_results,
attachment_raw_data_dict=self._attachment_raw_data_dict,
)

def _get_example_with_readers(self, example: schemas.Example) -> schemas.Example:
new_attachments: dict[str, schemas.AttachmentInfo] = {}
for name, attachment in (example.attachments or {}).items():
if (
self._attachment_raw_data_dict is not None
and str(example.id) + name in self._attachment_raw_data_dict
):
reader = io.BytesIO(
self._attachment_raw_data_dict[str(example.id) + name]
)
new_attachments[name] = {
"presigned_url": attachment["presigned_url"],
"reader": reader,
"mime_type": attachment["mime_type"],
}
else:
new_attachments[name] = attachment

return schemas.Example(
id=example.id,
created_at=example.created_at,
dataset_id=example.dataset_id,
inputs=example.inputs,
outputs=example.outputs,
metadata=example.metadata,
modified_at=example.modified_at,
runs=example.runs,
source_run_id=example.source_run_id,
attachments=new_attachments,
_host_url=example._host_url,
_tenant_id=example._tenant_id,
)

async def awith_predictions_and_evaluators(
self,
target: ATARGET_T,
evaluators: Sequence[Union[EVALUATOR_T, AEVALUATOR_T]],
/,
max_concurrency: Optional[int] = None,
) -> _AsyncExperimentManager:
"""Run predictions and evaluations in a single pipeline.
This allows evaluators to process results as soon as they're available from
the target function, rather than waiting for all predictions to complete first.
"""
evaluators = _resolve_evaluators(evaluators)

if not hasattr(self, "_evaluator_executor"):
self._evaluator_executor = cf.ThreadPoolExecutor(max_workers=4)

async def process_examples():
"""Create a single task per example.
That task is to run the target function and all the evaluators
sequentially.
"""
async for pred in self._apredict(
target,
max_concurrency=max_concurrency,
include_attachments=_include_attachments(target),
):
example, run = pred["example"], pred["run"]
result = self._arun_evaluators(
evaluators,
{
"run": run,
"example": example,
"evaluation_results": {"results": []},
},
executor=self._evaluator_executor,
)
yield result

# Run the per-example tasks with max-concurrency
# This guarantees that max_concurrency is the upper limit
# for the number of target/evaluators that can be run in parallel
experiment_results = aitertools.aiter_with_concurrency(
max_concurrency,
process_examples(),
_eager_consumption_timeout=0.001,
)

r1, r2, r3 = aitertools.atee(experiment_results, 3, lock=asyncio.Lock())

return _AsyncExperimentManager(
(result["example"] async for result in r1),
experiment=self._experiment,
metadata=self._metadata,
client=self.client,
runs=(result["run"] async for result in r2),
evaluation_results=(result["evaluation_results"] async for result in r3),
summary_results=self._summary_results,
include_attachments=self._include_attachments,
upload_results=self._upload_results,
)

Expand Down Expand Up @@ -740,7 +926,7 @@ async def predict_all():
# Yield the coroutine to be awaited later
yield _aforward(
fn,
example,
self._get_example_with_readers(example),
self.experiment_name,
self._metadata,
self.client,
Expand Down Expand Up @@ -796,19 +982,22 @@ async def _arun_evaluators(
run = current_results["run"]
example = current_results["example"]
eval_results = current_results["evaluation_results"]
for evaluator in evaluators:

async def _run_single_evaluator(evaluator):
try:
evaluator_response = await evaluator.aevaluate_run(
run=run,
example=example,
example=self._get_example_with_readers(example),
)
eval_results["results"].extend(
self.client._select_eval_results(evaluator_response)
selected_results = self.client._select_eval_results(
evaluator_response
)

if self._upload_results:
self.client._log_evaluation_feedback(
evaluator_response, run=run, _executor=executor
)
return selected_results
except Exception as e:
try:
feedback_keys = _extract_feedback_keys(evaluator)
Expand All @@ -824,13 +1013,14 @@ async def _arun_evaluators(
for key in feedback_keys
]
)
eval_results["results"].extend(
self.client._select_eval_results(error_response)
selected_results = self.client._select_eval_results(
error_response
)
if self._upload_results:
self.client._log_evaluation_feedback(
error_response, run=run, _executor=executor
)
return selected_results
except Exception as e2:
logger.debug(f"Error parsing feedback keys: {e2}")
pass
Expand All @@ -839,15 +1029,14 @@ async def _arun_evaluators(
f" run {run.id}: {repr(e)}",
exc_info=True,
)
logger.error(
f"Error running evaluator {repr(evaluator)} on"
f" run {run.id}: {repr(e)}",
exc_info=True,
)
if example.attachments is not None:
for attachment in example.attachments:
reader = example.attachments[attachment]["reader"]
reader.seek(0)

all_results = []
for evaluator in evaluators:
all_results.append(await _run_single_evaluator(evaluator))

for result in all_results:
if result is not None:
eval_results["results"].extend(result)
return ExperimentResultRow(
run=run,
example=example,
Expand Down Expand Up @@ -1064,10 +1253,6 @@ def _get_run(r: run_trees.RunTree) -> None:
client=client,
),
)
if include_attachments and example.attachments is not None:
for attachment in example.attachments:
reader = example.attachments[attachment]["reader"]
reader.seek(0)
except Exception as e:
logger.error(
f"Error running target function: {e}", exc_info=True, stacklevel=1
Expand Down
Loading

0 comments on commit b812149

Please sign in to comment.