Skip to content

Commit

Permalink
[pf-evals] Enable async batch run for evaluators by default (#3614)
Browse files Browse the repository at this point in the history
  • Loading branch information
ninghu authored Aug 16, 2024
1 parent b0cc956 commit 89aa66f
Show file tree
Hide file tree
Showing 28 changed files with 1,026 additions and 694 deletions.
5 changes: 5 additions & 0 deletions src/promptflow-evals/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
# Release History

## v0.3.3 (Upcoming)
### Features Added

### Bugs Fixed
- Fixed evaluators to accept (non-Azure) Open AI Configs.

### Improvements
- Set the PF_EVALS_BATCH_USE_ASYNC environment variable to True by default to enable asynchronous batch run for async-enabled built-in evaluators, improving performance.

## v0.3.2 (2024-08-13)
### Features Added
- Introduced `JailbreakAdversarialSimulator` for customers who need to do run jailbreak and non jailbreak adversarial simulations at the same time. More info in the README.md in `/promptflow/evals/synthetic/README.md#jailbreak-simulator`
Expand Down
5 changes: 5 additions & 0 deletions src/promptflow-evals/promptflow/evals/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

class EvaluationMetrics:
"""Metrics for model evaluation."""

GPT_GROUNDEDNESS = "gpt_groundedness"
GPT_RELEVANCE = "gpt_relevance"
GPT_COHERENCE = "gpt_coherence"
Expand All @@ -21,6 +22,7 @@ class EvaluationMetrics:

class Prefixes:
"""Column prefixes for inputs and outputs."""

INPUTS = "inputs."
OUTPUTS = "outputs."
TSG_OUTPUTS = "__outputs."
Expand All @@ -32,3 +34,6 @@ class Prefixes:

PF_BATCH_TIMEOUT_SEC_DEFAULT = 3600
PF_BATCH_TIMEOUT_SEC = "PF_BATCH_TIMEOUT_SEC"

OTEL_EXPORTER_OTLP_TRACES_TIMEOUT = "OTEL_EXPORTER_OTLP_TRACES_TIMEOUT"
OTEL_EXPORTER_OTLP_TRACES_TIMEOUT_DEFAULT = 60
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@

from promptflow._sdk._constants import PF_FLOW_ENTRY_IN_TMP, PF_FLOW_META_LOAD_IN_SUBPROCESS
from promptflow._utils.user_agent_utils import ClientUserAgentUtil
from promptflow.evals._constants import PF_BATCH_TIMEOUT_SEC, PF_BATCH_TIMEOUT_SEC_DEFAULT
from promptflow.evals._constants import (
OTEL_EXPORTER_OTLP_TRACES_TIMEOUT,
OTEL_EXPORTER_OTLP_TRACES_TIMEOUT_DEFAULT,
PF_BATCH_TIMEOUT_SEC,
PF_BATCH_TIMEOUT_SEC_DEFAULT,
)
from promptflow.tracing._integrations._openai_injector import inject_openai_api, recover_openai_api

from ..._user_agent import USER_AGENT
Expand All @@ -23,9 +28,11 @@ class BatchRunContext:
~promptflow.evals.evaluate.proxy_client.ProxyClient
]
"""

def __init__(self, client) -> None:
self.client = client
self._is_timeout_set_by_system = False
self._is_batch_timeout_set_by_system = False
self._is_otel_timeout_set_by_system = False

def __enter__(self):
if isinstance(self.client, CodeClient):
Expand All @@ -38,7 +45,12 @@ def __enter__(self):

if os.environ.get(PF_BATCH_TIMEOUT_SEC) is None:
os.environ[PF_BATCH_TIMEOUT_SEC] = str(PF_BATCH_TIMEOUT_SEC_DEFAULT)
self._is_timeout_set_by_system = True
self._is_batch_timeout_set_by_system = True

# For dealing with the timeout issue of OpenTelemetry exporter when multiple evaluators are running
if os.environ.get(OTEL_EXPORTER_OTLP_TRACES_TIMEOUT) is None:
os.environ[OTEL_EXPORTER_OTLP_TRACES_TIMEOUT] = str(OTEL_EXPORTER_OTLP_TRACES_TIMEOUT_DEFAULT)
self._is_otel_timeout_set_by_system = True

# For addressing the issue of asyncio event loop closed on Windows
set_event_loop_policy()
Expand All @@ -51,6 +63,10 @@ def __exit__(self, exc_type, exc_val, exc_tb):
os.environ.pop(PF_FLOW_ENTRY_IN_TMP, None)
os.environ.pop(PF_FLOW_META_LOAD_IN_SUBPROCESS, None)

if self._is_timeout_set_by_system:
if self._is_batch_timeout_set_by_system:
os.environ.pop(PF_BATCH_TIMEOUT_SEC, None)
self._is_timeout_set_by_system = False
self._is_batch_timeout_set_by_system = False

if self._is_otel_timeout_set_by_system:
os.environ.pop(OTEL_EXPORTER_OTLP_TRACES_TIMEOUT, None)
self._is_otel_timeout_set_by_system = False
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,7 @@ def get_metrics(self, proxy_run):

@staticmethod
def _should_batch_use_async(flow):
# TODO: Change default to true after promptflow-core releases fix for error handler for async prompty
# https://github.com/microsoft/promptflow/pull/3598
if os.getenv("PF_EVALS_BATCH_USE_ASYNC", "false").lower() == "true":
if os.getenv("PF_EVALS_BATCH_USE_ASYNC", "true").lower() == "true":
if hasattr(flow, "__call__") and inspect.iscoroutinefunction(flow.__call__):
return True
elif inspect.iscoroutinefunction(flow):
Expand Down
174 changes: 75 additions & 99 deletions src/promptflow-evals/promptflow/evals/evaluators/_chat/_chat.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
import asyncio
import json
import logging
from concurrent.futures import as_completed
from typing import Dict, List, Union

import numpy as np

from promptflow._utils.async_utils import async_run_allowing_running_loop
from promptflow.core import AzureOpenAIModelConfiguration, OpenAIModelConfiguration
from promptflow.tracing import ThreadPoolExecutorWithContext as ThreadPoolExecutor

from .._coherence import CoherenceEvaluator
from .._fluency import FluencyEvaluator
Expand All @@ -20,7 +20,58 @@
logger = logging.getLogger(__name__)


class _AsyncChatEvaluator:
class ChatEvaluator:
"""
Initialize a chat evaluator configured for a specific Azure OpenAI model.
:param model_config: Configuration for the Azure OpenAI model.
:type model_config: Union[~promptflow.core.AzureOpenAIModelConfiguration,
~promptflow.core.OpenAIModelConfiguration]
:param eval_last_turn: Set to True to evaluate only the most recent exchange in the dialogue,
focusing on the latest user inquiry and the assistant's corresponding response. Defaults to False
:type eval_last_turn: bool
:param parallel: If True, use parallel execution for evaluators. Else, use sequential execution.
Default is True.
:type parallel: bool
:return: A function that evaluates and generates metrics for "chat" scenario.
:rtype: Callable
**Usage**
.. code-block:: python
chat_eval = ChatEvaluator(model_config)
conversation = [
{"role": "user", "content": "What is the value of 2 + 2?"},
{"role": "assistant", "content": "2 + 2 = 4", "context": {
"citations": [
{"id": "math_doc.md", "content": "Information about additions: 1 + 2 = 3, 2 + 2 = 4"}
]
}
}
]
result = chat_eval(conversation=conversation)
**Output format**
.. code-block:: python
{
"evaluation_per_turn": {
"gpt_retrieval": [1.0, 2.0],
"gpt_groundedness": [5.0, 2.0],
"gpt_relevance": [3.0, 5.0],
"gpt_coherence": [1.0, 2.0],
"gpt_fluency": [3.0, 5.0]
}
"gpt_retrieval": 1.5,
"gpt_groundedness": 3.5,
"gpt_relevance": 4.0,
"gpt_coherence": 1.5,
"gpt_fluency": 4.0
}
"""

def __init__(
self,
model_config: Union[AzureOpenAIModelConfiguration, OpenAIModelConfiguration],
Expand All @@ -32,19 +83,19 @@ def __init__(

# TODO: Need a built-in evaluator for retrieval. It needs to be added to `self._rag_evaluators` collection
self._rag_evaluators = [
GroundednessEvaluator(model_config)._to_async(),
RelevanceEvaluator(model_config)._to_async(),
GroundednessEvaluator(model_config),
RelevanceEvaluator(model_config),
]
self._non_rag_evaluators = [
CoherenceEvaluator(model_config)._to_async(),
FluencyEvaluator(model_config)._to_async(),
CoherenceEvaluator(model_config),
FluencyEvaluator(model_config),
]
# TODO: Temporary workaround to close the gap of missing retrieval score
# https://msdata.visualstudio.com/Vienna/_workitems/edit/3186644
# For long term, we need to add a built-in evaluator for retrieval after prompt is generalized for QA and Chat
self._retrieval_chat_evaluator = RetrievalChatEvaluator(model_config)._to_async()
self._retrieval_chat_evaluator = RetrievalChatEvaluator(model_config)

async def __call__(self, *, conversation, **kwargs):
def __call__(self, *, conversation, **kwargs):
"""
Evaluates chat scenario.
Expand All @@ -54,7 +105,6 @@ async def __call__(self, *, conversation, **kwargs):
:return: The scores for Chat scenario.
:rtype: dict
"""

self._validate_conversation(conversation)

# Extract questions, answers and contexts from conversation
Expand Down Expand Up @@ -100,20 +150,22 @@ async def __call__(self, *, conversation, **kwargs):

if self._parallel:
# Parallel execution
tasks = [
self._evaluate_turn(turn_num, questions, answers, contexts, evaluator)
for evaluator in selected_evaluators
]
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, Exception):
logger.warning(f"Exception occurred during evaluation: {result}")
else:
with ThreadPoolExecutor() as executor:
future_to_evaluator = {
executor.submit(
self._evaluate_turn, turn_num, questions, answers, contexts, evaluator
): evaluator
for evaluator in selected_evaluators
}

for future in as_completed(future_to_evaluator):
result = future.result()
current_turn_result.update(result)
else:
# Sequential execution
for evaluator in selected_evaluators:
result = await self._evaluate_turn(turn_num, questions, answers, contexts, evaluator)
async_evaluator = evaluator._to_async()
result = self._evaluate_turn(turn_num, questions, answers, contexts, async_evaluator)
current_turn_result.update(result)

per_turn_results.append(current_turn_result)
Expand All @@ -132,20 +184,20 @@ async def __call__(self, *, conversation, **kwargs):

# Run RetrievalChatEvaluator and merge the results
if compute_rag_based_metrics:
retrieval_score = await self._retrieval_chat_evaluator(conversation=conversation_slice)
retrieval_score = self._retrieval_chat_evaluator(conversation=conversation_slice)
aggregated["gpt_retrieval"] = retrieval_score["gpt_retrieval"]
aggregated["evaluation_per_turn"]["gpt_retrieval"] = retrieval_score["evaluation_per_turn"]["gpt_retrieval"]
aggregated = dict(sorted(aggregated.items()))

return aggregated

async def _evaluate_turn(self, turn_num, questions, answers, contexts, evaluator):
def _evaluate_turn(self, turn_num, questions, answers, contexts, evaluator):
try:
question = questions[turn_num] if turn_num < len(questions) else ""
answer = answers[turn_num] if turn_num < len(answers) else ""
context = contexts[turn_num] if turn_num < len(contexts) else ""

score = await evaluator(question=question, answer=answer, context=context)
score = evaluator(question=question, answer=answer, context=context)

return score
except Exception as e: # pylint: disable=broad-exception-caught
Expand Down Expand Up @@ -238,79 +290,3 @@ def _validate_conversation(self, conversation: List[Dict]):
# Ensure the conversation ends with an assistant's turn
if expected_role != "user":
raise ValueError("The conversation must end with an assistant's turn.")


class ChatEvaluator:
"""
Initialize a chat evaluator configured for a specific Azure OpenAI model.
:param model_config: Configuration for the Azure OpenAI model.
:type model_config: Union[~promptflow.core.AzureOpenAIModelConfiguration,
~promptflow.core.OpenAIModelConfiguration]
:param eval_last_turn: Set to True to evaluate only the most recent exchange in the dialogue,
focusing on the latest user inquiry and the assistant's corresponding response. Defaults to False
:type eval_last_turn: bool
:param parallel: If True, use parallel execution for evaluators. Else, use sequential execution.
Default is True.
:type parallel: bool
:return: A function that evaluates and generates metrics for "chat" scenario.
:rtype: Callable
**Usage**
.. code-block:: python
chat_eval = ChatEvaluator(model_config)
conversation = [
{"role": "user", "content": "What is the value of 2 + 2?"},
{"role": "assistant", "content": "2 + 2 = 4", "context": {
"citations": [
{"id": "math_doc.md", "content": "Information about additions: 1 + 2 = 3, 2 + 2 = 4"}
]
}
}
]
result = chat_eval(conversation=conversation)
**Output format**
.. code-block:: python
{
"evaluation_per_turn": {
"gpt_retrieval": [1.0, 2.0],
"gpt_groundedness": [5.0, 2.0],
"gpt_relevance": [3.0, 5.0],
"gpt_coherence": [1.0, 2.0],
"gpt_fluency": [3.0, 5.0]
}
"gpt_retrieval": 1.5,
"gpt_groundedness": 3.5,
"gpt_relevance": 4.0,
"gpt_coherence": 1.5,
"gpt_fluency": 4.0
}
"""

def __init__(
self,
model_config: Union[AzureOpenAIModelConfiguration, OpenAIModelConfiguration],
eval_last_turn: bool = False,
parallel: bool = True,
):
self._async_evaluator = _AsyncChatEvaluator(model_config, eval_last_turn, parallel)

def __call__(self, *, conversation, **kwargs):
"""
Evaluates chat scenario.
:keyword conversation: The conversation to be evaluated. Each turn should have "role" and "content" keys.
"context" key is optional for assistant's turn and should have "citations" key with list of citations.
:paramtype conversation: List[Dict]
:return: The scores for Chat scenario.
:rtype: dict
"""
return async_run_allowing_running_loop(self._async_evaluator, conversation=conversation, **kwargs)

def _to_async(self):
return self._async_evaluator
Loading

0 comments on commit 89aa66f

Please sign in to comment.