Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async scoring methods #109

Merged
merged 6 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 47 additions & 2 deletions arthur_bench/run/testrun.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import uuid
from typing import Optional, List
from arthur_bench.models.models import CreateRunRequest
from typing import Optional, List, Union
from arthur_bench.models.models import CreateRunRequest, TestCaseOutput, ScoreResult
from arthur_bench.client.bench_client import BenchClient
from arthur_bench.exceptions import ArthurUserError

Expand Down Expand Up @@ -39,3 +39,48 @@ def save(self) -> uuid.UUID:
)
self.id = resp.id
return self.id

@classmethod
def from_flattened(
cls,
run_name: str,
ids: List[uuid.UUID],
candidate_output_list: List[str],
scores: Union[List[float], List[ScoreResult]],
client: BenchClient,
test_suite_id: uuid.UUID,
model_name: Optional[str] = None,
model_version: Optional[str] = None,
foundation_model: Optional[str] = None,
prompt_template: Optional[str] = None,
):
test_case_outputs = []
for i, result in enumerate(scores):
# temporary hack until score field is fully deprecated
score: Optional[float] = (
result if isinstance(result, float) else result.score # type: ignore
)
# we can't properly type this in python3.9. In 3.10 we can switch to
# https://github.com/python/mypy/issues/11934#issuecomment-1008295539
score_result: ScoreResult = (
ScoreResult(score=result) if isinstance(result, float) else result # type: ignore # noqa
) # type: ignore
test_case_outputs.append(
TestCaseOutput(
id=ids[i],
output=candidate_output_list[i],
score=score,
score_result=score_result,
)
)

return cls(
name=run_name,
test_case_outputs=test_case_outputs,
model_name=model_name,
model_version=model_version,
foundation_model=foundation_model,
prompt_template=prompt_template,
test_suite_id=test_suite_id,
client=client,
)
167 changes: 113 additions & 54 deletions arthur_bench/run/testsuite.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import logging
import pandas as pd
from typing import List, Optional, Union
import uuid
from typing import List, Optional, Union, Tuple
from arthur_bench.scoring import Scorer
from arthur_bench.models.models import (
TestSuiteRequest,
PaginatedTestSuite,
TestCaseOutput,
ScoringMethodType,
TestCaseResponse,
ScoreResult,
)
from arthur_bench.exceptions import (
UserValueError,
Expand All @@ -22,7 +21,7 @@
_initialize_scorer,
)

from arthur_bench.scoring.scorer import SINGLE_ITEM_BATCH_DEFAULT
from arthur_bench.scoring.scorer import SINGLE_ITEM_BATCH_DEFAULT, ASYNC_BATCH_DEFAULT


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -143,6 +142,109 @@ def reference_outputs(self) -> List[Optional[str]]:
def scoring_method(self) -> str:
return self.scorer.name()

def _pre_run(
self,
run_name: str,
candidate_data: Optional[pd.DataFrame] = None,
candidate_data_path: Optional[str] = None,
candidate_column: str = "candidate_output",
candidate_output_list: Optional[List[str]] = None,
context_column: Optional[str] = None,
context_list: Optional[List[str]] = None,
) -> Tuple[
List[str], List[str], List[uuid.UUID], Optional[List[str]], Optional[List[str]]
]:
if self.client.check_run_exists(str(self._data.id), run_name):
raise UserValueError(
f"A test run with the name {run_name} already exists. "
"Give this test run a unique name and re-run."
)

candidate_output_list, context_list = _load_run_data_from_args(
candidate_data=candidate_data,
candidate_data_path=candidate_data_path,
candidate_column=candidate_column,
candidate_output_list=candidate_output_list,
context_column=context_column,
context_list=context_list,
)

if len(candidate_output_list) != len(self.test_cases):
raise UserValueError(
f"candidate data has {len(candidate_output_list)} tests but "
f"expected {len(self.test_cases)} tests"
)

inputs = self.input_texts
ids = [case.id for case in self.test_cases]
# ref outputs should be None if any items are None (we validate nullness must be
# all-or-none)
ref_outputs: Optional[List[str]] = []
if ref_outputs is not None:
for case in self.test_cases:
if case.reference_output is None:
ref_outputs = None
break
else:
ref_outputs.append(case.reference_output)
# TODO: make separate object?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With only one caller, I don't think its a big deal. Definitely an interesting return type hint though!

return candidate_output_list, inputs, ids, ref_outputs, context_list

async def arun(
self,
run_name: str,
candidate_data: Optional[pd.DataFrame] = None,
candidate_data_path: Optional[str] = None,
candidate_column: str = "candidate_output",
candidate_output_list: Optional[List[str]] = None,
context_column: Optional[str] = None,
context_list: Optional[List[str]] = None,
save: bool = True,
batch_size: int = ASYNC_BATCH_DEFAULT,
model_name: Optional[str] = None,
model_version: Optional[str] = None,
foundation_model: Optional[str] = None,
prompt_template: Optional[str] = None,
) -> TestRun:
candidate_output_list, inputs, ids, ref_outputs, context_list = self._pre_run(
run_name=run_name,
candidate_data=candidate_data,
candidate_data_path=candidate_data_path,
candidate_column=candidate_column,
candidate_output_list=candidate_output_list,
context_column=context_column,
context_list=context_list,
)
try:
all_scores = await self.scorer.arun(
candidate_outputs=candidate_output_list,
reference_outputs=ref_outputs,
inputs=inputs,
contexts=context_list,
batch_size=batch_size,
)
except Exception as e:
logger.error(f"failed to create run: {e}")
raise ArthurInternalError(f"failed to create run {run_name}") from e

run = TestRun.from_flattened(
run_name=run_name,
ids=ids,
candidate_output_list=candidate_output_list,
scores=all_scores,
model_name=model_name,
model_version=model_version,
foundation_model=foundation_model,
prompt_template=prompt_template,
test_suite_id=self._data.id,
client=self.client,
)

if save:
run.save()

return run

def run(
self,
run_name: str,
Expand Down Expand Up @@ -183,15 +285,8 @@ def run(
:returns: TestRun object containing scored outputs
"""

# make sure no existing test run named run_name is already attached to this
# suite
if self.client.check_run_exists(str(self._data.id), run_name):
raise UserValueError(
f"A test run with the name {run_name} already exists. "
"Give this test run a unique name and re-run."
)

candidate_output_list, context_list = _load_run_data_from_args(
candidate_output_list, inputs, ids, ref_outputs, context_list = self._pre_run(
run_name=run_name,
candidate_data=candidate_data,
candidate_data_path=candidate_data_path,
candidate_column=candidate_column,
Expand All @@ -200,24 +295,6 @@ def run(
context_list=context_list,
)

if len(candidate_output_list) != len(self.test_cases):
raise UserValueError(
f"candidate data has {len(candidate_output_list)} tests but "
f"expected {len(self.test_cases)} tests"
)

inputs = self.input_texts
ids = [case.id for case in self.test_cases]
# ref outputs should be None if any items are None (we validate nullness must be
# all-or-none)
ref_outputs: Optional[List[str]] = []
if ref_outputs is not None:
for case in self.test_cases:
if case.reference_output is None:
ref_outputs = None
break
else:
ref_outputs.append(case.reference_output)
try:
all_scores = self.scorer.run(
candidate_output_list,
Expand All @@ -230,29 +307,11 @@ def run(
logger.error(f"failed to create run: {e}")
raise ArthurInternalError(f"failed to create run {run_name}") from e

test_case_outputs = []
for i, result in enumerate(all_scores):
# temporary hack until score field is fully deprecated
score: Optional[float] = (
result if isinstance(result, float) else result.score # type: ignore
)
# we can't properly type this in python3.9. In 3.10 we can switch to
# https://github.com/python/mypy/issues/11934#issuecomment-1008295539
score_result: ScoreResult = (
ScoreResult(score=result) if isinstance(result, float) else result # type: ignore # noqa
) # type: ignore
test_case_outputs.append(
TestCaseOutput(
id=ids[i],
output=candidate_output_list[i],
score=score,
score_result=score_result,
)
)

run = TestRun(
name=run_name,
test_case_outputs=test_case_outputs,
run = TestRun.from_flattened(
run_name=run_name,
ids=ids,
candidate_output_list=candidate_output_list,
scores=all_scores,
model_name=model_name,
model_version=model_version,
foundation_model=foundation_model,
Expand Down
62 changes: 62 additions & 0 deletions arthur_bench/scoring/scorer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from abc import abstractmethod, ABC
import sys
import asyncio
import json
import logging
from pathlib import Path
Expand All @@ -20,6 +21,7 @@


SINGLE_ITEM_BATCH_DEFAULT = 1
ASYNC_BATCH_DEFAULT = 5


TScorer = TypeVar("TScorer", bound="Scorer")
Expand Down Expand Up @@ -145,6 +147,66 @@ def run(

return all_scores

async def arun_batch(
self,
candidate_batch: List[str],
reference_batch: Optional[List[str]] = None,
input_text_batch: Optional[List[str]] = None,
context_batch: Optional[List[str]] = None,
) -> Union[List[float], List[ScoreResult]]:
"""
Async version of run_batch method.
"""
raise NotImplementedError

async def arun(
self,
candidate_outputs: List[str],
reference_outputs: Optional[List[str]] = None,
inputs: Optional[List[str]] = None,
contexts: Optional[List[str]] = None,
batch_size: int = ASYNC_BATCH_DEFAULT,
) -> Union[List[float], List[ScoreResult]]:
"""
Async version of run method.
"""
all_scores: Union[List[float], List[ScoreResult]] = []
tasks = []
for i in range(0, len(candidate_outputs), batch_size):
input_batch = (
list(inputs[i : i + batch_size]) if inputs is not None else None
)
ref_batch = (
list(reference_outputs[i : i + batch_size])
if reference_outputs is not None
else None
)

context_batch = None if contexts is None else contexts[i : i + batch_size]
task = asyncio.create_task(
self.arun_batch(
candidate_outputs[i : i + batch_size],
ref_batch,
input_batch,
context_batch,
)
)
tasks.append(task)

results = await asyncio.gather(*tasks) # returns tasks in order

# validate arun_batch results and extend all_scores
for scores in results:
if self.is_categorical():
for score in scores:
if isinstance(score, float) or score.category is None:
raise ValueError(
"categorical scorer must return categorical results"
)
all_scores.extend(scores) # type: ignore

return all_scores

def to_dict(self, warn=False):
"""
Provides a json serializable representation of the scorer.
Expand Down
18 changes: 18 additions & 0 deletions tests/test_testsuite.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,15 @@ def run_batch(
) -> List[float]:
return [0.9, 0.7]

async def arun_batch(
self,
reference_batch: List[str],
candidate_batch: List[str],
input_text_batch: Optional[List[str]] = None,
context_batch: Optional[List[str]] = None,
) -> List[float]:
return [0.9, 0.7]


class CustomScorer(Scorer):
def __init__(self, custom_name="param_name"):
Expand All @@ -77,6 +86,15 @@ def run_batch(
) -> List[float]:
return [0.5 for _ in range(len(reference_batch))]

async def arun_batch(
self,
reference_batch: List[str],
candidate_batch: List[str],
input_text_batch: Optional[List[str]] = None,
context_batch: Optional[List[str]] = None,
) -> List[float]:
return [0.5 for _ in range(len(reference_batch))]


@pytest.fixture(scope="session")
def mock_load_scoring():
Expand Down