Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port of Batch Engine code #40133

Merged
merged 4 commits into from
Mar 25, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,15 @@
# ---------------------------------------------------------
from .eval_run_context import EvalRunContext
from .code_client import CodeClient
from .proxy_client import ProxyClient
from .proxy_client import ProxyClient, ProxyRun
from ._run_submitter_client import RunSubmitterClient
from .target_run_context import TargetRunContext
from .proxy_client import ProxyRun

__all__ = ["CodeClient", "ProxyClient", "EvalRunContext", "TargetRunContext", "ProxyRun"]
__all__ = [
"CodeClient",
"ProxyClient",
"EvalRunContext",
"TargetRunContext",
"ProxyRun",
"RunSubmitterClient",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------

import logging
import pandas as pd
import sys
from collections import defaultdict
from concurrent.futures import Future, ThreadPoolExecutor
from os import PathLike
from typing import Any, Callable, Dict, Final, List, Mapping, Optional, Sequence, Union, cast

from .batch_clients import BatchClientRun, HasAsyncCallable
from ...legacy._batch_engine._run_submitter import RunSubmitter
from ...legacy._batch_engine._config import BatchEngineConfig
from ...legacy._batch_engine._run import Run


LOGGER = logging.getLogger(__name__)


class RunSubmitterClient:
def __init__(self, config: Optional[BatchEngineConfig] = None) -> None:
self._config = config or BatchEngineConfig(LOGGER, use_async=True)
self._thread_pool = ThreadPoolExecutor(thread_name_prefix="evaluators_thread")

def run(
self,
flow: Callable,
data: Union[str, PathLike, pd.DataFrame],
column_mapping: Optional[Dict[str, str]] = None,
evaluator_name: Optional[str] = None,
**kwargs: Any,
) -> BatchClientRun:
if not isinstance(data, pd.DataFrame):
# Should never get here
raise ValueError("Data must be a pandas DataFrame")
if not column_mapping:
raise ValueError("Column mapping must be provided")

# The column mappings are index by data to indicate they come from the data
# input. Update the inputs so that each entry is a dictioary with a data key
# that contains the original input data.
inputs = [{"data": input_data} for input_data in data.to_dict(orient="records")]

# always uses async behind the scenes
if isinstance(flow, HasAsyncCallable):
flow = flow._to_async() # pylint: disable=protected-access

run_submitter = RunSubmitter(self._config)
run_future = self._thread_pool.submit(
run_submitter.submit,
dynamic_callable=flow,
inputs=inputs,
column_mapping=column_mapping,
name_prefix=evaluator_name,
created_on=kwargs.pop("created_on", None),
storage_creator=kwargs.pop("storage_creator", None),
**kwargs,
)

return run_future

def get_details(self, client_run: BatchClientRun, all_results: bool = False) -> pd.DataFrame:
run = self._get_run(client_run)

data: Dict[str, List[Any]] = defaultdict(list)
stop_at: Final[int] = self._config.default_num_results if not all_results else sys.maxsize

def _update(prefix: str, items: Sequence[Mapping[str, Any]]) -> None:
for i, line in enumerate(items):
if i >= stop_at:
break
for k, value in line.items():
key = f"{prefix}.{k}"
data[key].append(value)

_update("inputs", run.inputs)
_update("outputs", run.outputs)

df = pd.DataFrame(data).reindex(columns=[k for k in data.keys()])
return df

def get_metrics(self, client_run: BatchClientRun) -> Dict[str, Any]:
run = self._get_run(client_run)
return dict(run.metrics)

def get_run_summary(self, client_run: BatchClientRun) -> Dict[str, Any]:
run = self._get_run(client_run)

total_lines = run.result.total_lines if run.result else 0
failed_lines = run.result.failed_lines if run.result else 0

return {
"status": run.status.value,
"duration": str(run.duration),
"completed_lines": total_lines - failed_lines,
"failed_lines": failed_lines,
# "log_path": "",
}

@staticmethod
def _get_run(run: BatchClientRun) -> Run:
return cast(Future[Run], run).result()
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------

import pandas
from os import PathLike
from typing import Any, Awaitable, Callable, Dict, Optional, Protocol, Union, runtime_checkable


class BatchClientRun(Protocol):
"""The protocol for the batch client run."""

pass


@runtime_checkable
class HasAsyncCallable(Protocol):
"""The protocol for an object that has an async callable."""

def _to_async(self) -> Callable[[Any, Any], Awaitable[Any]]: ...


class BatchClient(Protocol):
"""The protocol for the batch client. This allows for running a flow on a data source
and getting the details of the run."""

def run(
self,
flow: Callable,
data: Union[str, PathLike, pandas.DataFrame],
column_mapping: Optional[Dict[str, str]] = None,
evaluator_name: Optional[str] = None,
**kwargs: Any,
) -> BatchClientRun:
"""Run the given flow on the data with the given column mapping.

:param flow: The flow to run.
:type flow: Union[Callable, HasAsyncCallable]
:param data: The JSONL file containing the data to run the flow on,
or the loaded data
:type data: Union[str, PathLike]
:param column_mapping: The column mapping to use.
:type column_mapping: Mapping[str, str]
:param name: The name of the run.
:type name: Optional[str]
:param kwargs: Additional keyword arguments to pass to the flow.
:return: The resulint run.
:rtype: BatchClientRun
"""
...

def get_details(self, client_run: BatchClientRun, all_results: bool = False) -> pandas.DataFrame:
"""Get the details of the run.

:param client_run: The run to get the details of.
:type client_run: BatchClientRun
:param all_results: Whether to get all results.
:type all_results: bool
:return: The details of the run.
:rtype: pandas.DataFrame
"""
...

def get_metrics(self, client_run: BatchClientRun) -> Dict[str, Any]:
"""Get the metrics of the run.

:param client_run: The run to get the metrics of.
:type client_run: BatchClientRun
:return: The metrics of the run.
:rtype: Mapping[str, Any]
"""
...

def get_run_summary(self, client_run: BatchClientRun) -> Dict[str, Any]:
"""Get the summary of the run.

:param client_run: The run to get the summary of.
:type client_run: BatchClientRun
:return: The summary of the run.
:rtype: Mapping[str, Any]
"""
...
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------

# NOTE: This is a direct port of the bare minimum needed for BatchEngine functionality from
# the original Promptflow code. The goal here is expediency, not elegance. As such
# parts of this code may be a little "quirky", seem incomplete in places, or contain
# longer TODOs comments than usual. In a future code update, large swaths of this code
# will be refactored or deleted outright.
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------

from dataclasses import dataclass
from logging import Logger

from ..._constants import PF_BATCH_TIMEOUT_SEC_DEFAULT


@dataclass
class BatchEngineConfig:
"""Context for a batch of evaluations. This will contain the configuration,
logging, and other needed information."""

logger: Logger
"""The logger to use for logging messages."""

batch_timeout_seconds: int = PF_BATCH_TIMEOUT_SEC_DEFAULT
"""The maximum amount of time to wait for all evaluations in the batch to complete."""

run_timeout_seconds: int = 600
"""The maximum amount of time to wait for an evaluation to run against a single entry
in the data input to complete."""

max_concurrency: int = 10
"""The maximum number of evaluations to run concurrently."""

use_async: bool = True
"""Whether to use asynchronous evaluation."""

default_num_results: int = 100
"""The default number of results to return if you don't ask for all results."""

def __post_init__(self):
if self.logger is None:
raise ValueError("logger cannot be None")
if self.batch_timeout_seconds <= 0:
raise ValueError("batch_timeout_seconds must be greater than 0")
if self.run_timeout_seconds <= 0:
raise ValueError("run_timeout_seconds must be greater than 0")
if self.max_concurrency <= 0:
raise ValueError("max_concurrency must be greater than 0")
if self.default_num_results <= 0:
raise ValueError("default_num_results must be greater than 0")
Loading
Loading