From 1c7fb492a90f12a5e65e7f1cca72477661cb9678 Mon Sep 17 00:00:00 2001 From: shadeMe Date: Wed, 15 May 2024 15:10:47 +0200 Subject: [PATCH 1/7] feat: Implement `PipelinePair` primitive and related utility functions --- haystack_experimental/__init__.py | 3 + .../evaluation/pipeline_pair.py | 197 +++++++++++++++ haystack_experimental/evaluation/util.py | 91 +++++++ haystack_experimental/testing/__init__.py | 3 + .../testing/sample_components.py | 35 +++ test/evaluation/__init__.py | 3 + test/evaluation/test_pipeline_pair.py | 236 ++++++++++++++++++ test/evaluation/test_util.py | 54 ++++ 8 files changed, 622 insertions(+) create mode 100644 haystack_experimental/__init__.py create mode 100644 haystack_experimental/evaluation/pipeline_pair.py create mode 100644 haystack_experimental/evaluation/util.py create mode 100644 haystack_experimental/testing/__init__.py create mode 100644 haystack_experimental/testing/sample_components.py create mode 100644 test/evaluation/__init__.py create mode 100644 test/evaluation/test_pipeline_pair.py create mode 100644 test/evaluation/test_util.py diff --git a/haystack_experimental/__init__.py b/haystack_experimental/__init__.py new file mode 100644 index 00000000..c1764a6e --- /dev/null +++ b/haystack_experimental/__init__.py @@ -0,0 +1,3 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 diff --git a/haystack_experimental/evaluation/pipeline_pair.py b/haystack_experimental/evaluation/pipeline_pair.py new file mode 100644 index 00000000..2a3e0dcd --- /dev/null +++ b/haystack_experimental/evaluation/pipeline_pair.py @@ -0,0 +1,197 @@ +from dataclasses import dataclass +from typing import Any, Callable, Dict, List, Optional, Set, Tuple + +from haystack import Pipeline + + +@dataclass(frozen=True) +class PipelinePair: + """ + A pair of pipelines that are linked together and + executed sequentially. + + :param first: + The first pipeline in the sequence. + :param second: + The second pipeline in the sequence. + :param outputs_to_inputs: + A mapping of the outputs of the first pipeline to the + inputs of the second pipeline in the following format: + `"name_of_component.name_of_output": "name_of_component.name_of_input`. + A single output can be mapped to multiple inputs. + :param map_first_outputs: + A function that post-processes the outputs of the first + pipeline, which it receives as its (only) argument. + :param included_first_outputs: + Names of components in the first pipeline whose outputs + should be included in the final outputs. + :param included_second_outputs: + Names of components in the second pipeline whose outputs + should be included in the final outputs. + """ + + first: Pipeline + second: Pipeline + outputs_to_inputs: Dict[str, List[str]] + map_first_outputs: Optional[Callable] = None + included_first_outputs: Optional[Set[str]] = None + included_second_outputs: Optional[Set[str]] = None + + def __post_init__(self): + first_outputs = self.first.outputs(include_components_with_connected_outputs=True) + second_inputs = self.second.inputs(include_components_with_connected_inputs=True) + seen_second_inputs = set() + + # Validate the mapping of outputs from the first pipeline + # to the inputs of the second pipeline. + for first_out, second_ins in self.outputs_to_inputs.items(): + first_comp_name, first_out_name = self._split_input_output_path(first_out) + if first_comp_name not in first_outputs: + raise ValueError(f"Output component '{first_comp_name}' not found in first pipeline.") + elif first_out_name not in first_outputs[first_comp_name]: + raise ValueError( + f"Component '{first_comp_name}' in first pipeline does not have expected output '{first_out_name}'." + ) + + for second_in in second_ins: + if second_in in seen_second_inputs: + raise ValueError( + f"Input '{second_in}' in second pipeline is connected to multiple first pipeline outputs." + ) + + second_comp_name, second_input_name = self._split_input_output_path(second_in) + if second_comp_name not in second_inputs: + raise ValueError(f"Input component '{second_comp_name}' not found in second pipeline.") + elif second_input_name not in second_inputs[second_comp_name]: + raise ValueError( + f"Component '{second_comp_name}' in second pipeline does not have expected input '{second_input_name}'." + ) + seen_second_inputs.add(second_in) + + def _validate_second_inputs(self, inputs: Dict[str, Dict[str, Any]]): + # Check if the connected input is also provided explicitly. + second_connected_inputs = [ + self._split_input_output_path(p_h) for p in self.outputs_to_inputs.values() for p_h in p + ] + for component_name, input_name in second_connected_inputs: + provided_input = inputs.get(component_name) + if provided_input is None: + continue + elif input_name in provided_input: + raise ValueError( + f"Second pipeline input '{component_name}.{input_name}' cannot be provided both explicitly and by the first pipeline." + ) + + def _split_input_output_path(self, path: str) -> Tuple[str, str]: + # Split the input/output path into component name and input/output name. + pos = path.find(".") + if pos == -1: + raise ValueError( + f"Invalid pipeline i/o path specifier '{path}' - Must be in the following format: ." + ) + return path[:pos], path[pos + 1 :] + + def _prepare_reqd_outputs_for_first_pipeline(self) -> Set[str]: + # To ensure that we have all the outputs from the first + # pipeline that are required by the second pipeline. + first_components_with_outputs = {self._split_input_output_path(p)[0] for p in self.outputs_to_inputs.keys()} + if self.included_first_outputs is not None: + first_components_with_outputs = first_components_with_outputs.union(self.included_first_outputs) + return first_components_with_outputs + + def _map_first_second_pipeline_io( + self, first_outputs: Dict[str, Dict[str, Any]], second_inputs: Dict[str, Dict[str, Any]] + ) -> Dict[str, Dict[str, Any]]: + # Map the first pipeline outputs to the second pipeline inputs. + for first_output, second_input_candidates in self.outputs_to_inputs.items(): + first_component, first_output = self._split_input_output_path(first_output) + + # Each output from the first pipeline can be mapped to multiple inputs in the second pipeline. + for second_input in second_input_candidates: + second_component, second_input_socket = self._split_input_output_path(second_input) + + second_component_inputs = second_inputs.get(second_component) + if second_component_inputs is not None: + # Pre-condition should've been validated earlier. + assert second_input_socket not in second_component_inputs + # The first pipeline's output should also guaranteed at this point. + second_component_inputs[second_input_socket] = first_outputs[first_component][first_output] + else: + second_inputs[second_component] = { + second_input_socket: first_outputs[first_component][first_output] + } + + return second_inputs + + def run( + self, first_inputs: Dict[str, Dict[str, Any]], second_inputs: Optional[Dict[str, Dict[str, Any]]] = None + ) -> Dict[str, Dict[str, Any]]: + """ + Execute the pipeline pair by invoking first pipeline + and then the second with the outputs of the former. This + assumes that both pipelines have the same input modality, + i.e., the shapes of the first pipeline's outputs match the + shapes of the second pipeline's inputs. + + :param first_inputs: + The inputs to the first pipeline. + :param second_inputs: + The inputs to the second pipeline. + :returns: + A dictionary with the following keys: + - `first` - The outputs of the first pipeline. + - `second` - The outputs of the second pipeline. + """ + second_inputs = second_inputs or {} + self._validate_second_inputs(second_inputs) + + first_outputs = self.first.run( + first_inputs, include_outputs_from=self._prepare_reqd_outputs_for_first_pipeline() + ) + if self.map_first_outputs is not None: + first_outputs = self.map_first_outputs(first_outputs) + second_inputs = self._map_first_second_pipeline_io(first_outputs, second_inputs) + second_outputs = self.second.run(second_inputs, include_outputs_from=self.included_second_outputs) + + return {"first": first_outputs, "second": second_outputs} + + def run_first_as_batch( + self, first_inputs: List[Dict[str, Dict[str, Any]]], second_inputs: Optional[Dict[str, Dict[str, Any]]] = None + ) -> Dict[str, Dict[str, Any]]: + """ + Execute the pipeline pair by invoking the first pipeline + iteratively over the list of inputs and passing the cumulative + outputs to the second pipeline. This is suitable when the first + pipeline has a single logical input-to-output mapping and the + second pipeline expects multiple logical inputs, e.g: a retrieval + pipeline that accepts a single query and returns a list of documents + and an evaluation pipeline that accepts multiple lists of documents + and multiple lists of ground truth data. + + :param first_inputs: + A batch of inputs to the first pipeline. A mapping + function must be provided to aggregate the outputs. + :param second_inputs: + The inputs to the second pipeline. + :returns: + A dictionary with the following keys: + - `first` - The (aggregate) outputs of the first pipeline. + - `second` - The outputs of the second pipeline. + """ + second_inputs = second_inputs or {} + self._validate_second_inputs(second_inputs) + + first_components_with_outputs = self._prepare_reqd_outputs_for_first_pipeline() + if self.map_first_outputs is None: + raise ValueError("Mapping function for first pipeline outputs must be provided for batch execution.") + + first_outputs: Dict[str, Dict[str, Any]] = self.map_first_outputs( + [self.first.run(i, include_outputs_from=first_components_with_outputs) for i in first_inputs] + ) + if not isinstance(first_outputs, dict): + raise ValueError("Mapping function must return an aggregate dictionary of outputs.") + + second_inputs = self._map_first_second_pipeline_io(first_outputs, second_inputs) + second_outputs = self.second.run(second_inputs, include_outputs_from=self.included_second_outputs) + + return {"first": first_outputs, "second": second_outputs} diff --git a/haystack_experimental/evaluation/util.py b/haystack_experimental/evaluation/util.py new file mode 100644 index 00000000..59216dd6 --- /dev/null +++ b/haystack_experimental/evaluation/util.py @@ -0,0 +1,91 @@ +from copy import deepcopy +from typing import Any, Dict, List + + +def aggregate_batched_pipeline_outputs(outputs: List[Dict[str, Dict[str, Any]]]) -> Dict[str, Dict[str, Any]]: + """ + Combine the outputs of a pipeline that has been executed + iteratively over a batch of inputs. It performs a transpose + operation on the first and the third dimensions of the outputs. + + :param outputs: + A list of outputs from the pipeline, where each output + is a dictionary with the same keys and values with the + same keys. + :returns: + The combined outputs. + """ + # The pipeline is invoked iteratively over a batch of inputs, such + # that each element in the outputs corresponds to a single element in + # the batch input. + if len(outputs) == 0: + return {} + elif len(outputs) == 1: + return outputs[0] + + # We'll use the first output as a sentinel to determine + # if the shape of the rest of the outputs are the same. + sentinel = outputs[0] + for output in outputs[1:]: + if output.keys() != sentinel.keys(): + raise ValueError(f"Expected components '{list(sentinel.keys())}' but got '{list(output.keys())}'") + + for component_name, expected in sentinel.items(): + got = output[component_name] + if got.keys() != expected.keys(): + raise ValueError( + f"Expected outputs from component '{component_name}' to have keys '{list(expected.keys())}' but got '{list(got.keys())}'" + ) + + # The outputs are of the correct/same shape. Now to transpose + # the outermost list with the innermost dictionary. + transposed: Dict[str, Dict[str, Any]] = {} + for k, v in sentinel.items(): + transposed[k] = {k_h: [] for k_h in v.keys()} + + for output in outputs: + for component_name, component_outputs in output.items(): + dest = transposed[component_name] + for output_name, output_value in component_outputs.items(): + dest[output_name].append(output_value) + + return transposed + + +def deaggregate_batched_pipeline_inputs(inputs: Dict[str, Dict[str, List[Any]]]) -> List[Dict[str, Dict[str, Any]]]: + """ + Separate the inputs of a pipeline that has been batched along + its innermost dimension (component -> input -> values). It + performs a transpose operation on the first and the third dimensions + of the inputs. + + :param inputs: + A dictionary of pipeline inputs that maps + component-input pairs to lists of values. + :returns: + The separated inputs. + """ + if len(inputs) == 0: + return [] + + sentinel = next(iter(inputs.values())) # First component's inputs + sentinel = next(iter(sentinel.values())) # First component's first input's values + + for component_name, component_inputs in inputs.items(): + for input_name, input_values in component_inputs.items(): + if len(input_values) != len(sentinel): + raise ValueError( + f"Expected input '{component_name}.{input_name}' to have {len(sentinel)} values but got {len(input_values)}" + ) + + proto = {k: {k_h: None for k_h in v.keys()} for k, v in inputs.items()} + transposed: List[Dict[str, Dict[str, Any]]] = [] + + for i in range(len(sentinel)): + new_dict = deepcopy(proto) + for component_name, component_inputs in inputs.items(): + for input_name, input_values in component_inputs.items(): + new_dict[component_name][input_name] = input_values[i] + transposed.append(new_dict) + + return transposed diff --git a/haystack_experimental/testing/__init__.py b/haystack_experimental/testing/__init__.py new file mode 100644 index 00000000..c1764a6e --- /dev/null +++ b/haystack_experimental/testing/__init__.py @@ -0,0 +1,3 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 diff --git a/haystack_experimental/testing/sample_components.py b/haystack_experimental/testing/sample_components.py new file mode 100644 index 00000000..b10d73ce --- /dev/null +++ b/haystack_experimental/testing/sample_components.py @@ -0,0 +1,35 @@ +from typing import Optional, List +from haystack.core.component import component + + +@component +class AddFixedValueBatch: + """ + Adds two values together. + """ + + def __init__(self, add: int = 1): + self.add = add + + @component.output_types(result=List[int]) + def run(self, value: List[int], add: Optional[List[int]] = None): + """ + Adds two values together. + """ + if add is None: + add = [self.add] * len(value) + return {"result": [v + a for v, a in zip(value, add)]} + + +@component +class DoubleBatch: + """ + Doubles the input value. + """ + + @component.output_types(value=List[int]) + def run(self, value: List[int]): + """ + Doubles the input value. + """ + return {"value": [v * 2 for v in value]} diff --git a/test/evaluation/__init__.py b/test/evaluation/__init__.py new file mode 100644 index 00000000..c1764a6e --- /dev/null +++ b/test/evaluation/__init__.py @@ -0,0 +1,3 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 diff --git a/test/evaluation/test_pipeline_pair.py b/test/evaluation/test_pipeline_pair.py new file mode 100644 index 00000000..16007f8a --- /dev/null +++ b/test/evaluation/test_pipeline_pair.py @@ -0,0 +1,236 @@ +import pytest + +from haystack import Pipeline +from haystack_experimental.evaluation.pipeline_pair import PipelinePair +from haystack_experimental.evaluation.util import aggregate_batched_pipeline_outputs + +from haystack.testing.sample_components import AddFixedValue, Double +from haystack_experimental.testing.sample_components import AddFixedValueBatch, DoubleBatch + + +@pytest.fixture +def first_pipeline(): + first = Pipeline() + first.add_component("first_addition", AddFixedValue(add=10)) + first.add_component("second_addition", AddFixedValue(add=100)) + first.add_component("double", Double()) + first.connect("first_addition", "double") + first.connect("double", "second_addition") + return first + + +@pytest.fixture +def second_pipeline(): + second = Pipeline() + second.add_component("first_addition", AddFixedValue(add=1)) + second.add_component("second_addition", AddFixedValue(add=2)) + second.add_component("double", Double()) + second.connect("first_addition", "double") + second.connect("double", "second_addition") + return second + + +@pytest.fixture +def second_pipeline_batched(): + second = Pipeline() + second.add_component("first_addition", AddFixedValueBatch(add=1)) + second.add_component("second_addition", AddFixedValueBatch(add=2)) + second.add_component("double", DoubleBatch()) + second.connect("first_addition", "double") + second.connect("double", "second_addition") + return second + + +def test_pipeline_pair_init(first_pipeline, second_pipeline): + pair = PipelinePair( + first=first_pipeline, + second=second_pipeline, + outputs_to_inputs={"first_addition.result": ["first_addition.value"]}, + ) + + +def test_pipeline_pair_invalid_io_specifier(first_pipeline, second_pipeline): + with pytest.raises(ValueError, match="Invalid pipeline i/o path specifier"): + _ = PipelinePair( + first=first_pipeline, second=second_pipeline, outputs_to_inputs={"nonexistent": ["nonexistent"]} + ) + + +def test_pipeline_pair_invalid_first_component(first_pipeline, second_pipeline): + with pytest.raises(ValueError, match="Output component .* not found in first pipeline."): + _ = PipelinePair( + first=first_pipeline, second=second_pipeline, outputs_to_inputs={"nonexistent.out": ["nonexistent.in"]} + ) + + +def test_pipeline_pair_invalid_first_component_output(first_pipeline, second_pipeline): + + with pytest.raises(ValueError, match="Component .* in first pipeline does not have expected output"): + _ = PipelinePair( + first=first_pipeline, second=second_pipeline, outputs_to_inputs={"double.out": ["nonexistent.in"]} + ) + + +def test_pipeline_pair_invalid_second_component(first_pipeline, second_pipeline): + with pytest.raises(ValueError, match="Input component .* not found in second pipeline."): + _ = PipelinePair( + first=first_pipeline, + second=second_pipeline, + outputs_to_inputs={"first_addition.result": ["nonexistent.in"]}, + ) + + +def test_pipeline_pair_invalid_second_component_input(first_pipeline, second_pipeline): + with pytest.raises(ValueError, match="Component .* in second pipeline does not have expected input"): + _ = PipelinePair( + first=first_pipeline, + second=second_pipeline, + outputs_to_inputs={"first_addition.result": ["second_addition.some_input"]}, + ) + + +def test_pipeline_pair_invalid_second_multiple_inputs(first_pipeline, second_pipeline): + with pytest.raises( + ValueError, match="Input .* in second pipeline is connected to multiple first pipeline outputs." + ): + _ = PipelinePair( + first=first_pipeline, + second=second_pipeline, + outputs_to_inputs={ + "first_addition.result": ["second_addition.value"], + "second_addition.result": ["second_addition.value"], + }, + ) + + +def test_pipeline_pair_run(first_pipeline, second_pipeline): + pair = PipelinePair( + first=first_pipeline, + second=second_pipeline, + outputs_to_inputs={"first_addition.result": ["first_addition.value"]}, + included_first_outputs={"first_addition"}, + included_second_outputs={"double"}, + ) + + results = pair.run({"first_addition": {"value": 1}}) + assert results == { + "first": {"first_addition": {"result": 11}, "second_addition": {"result": 122}}, + "second": {"double": {"value": 24}, "second_addition": {"result": 26}}, + } + + pair = PipelinePair( + first=first_pipeline, + second=second_pipeline, + outputs_to_inputs={"first_addition.result": ["first_addition.value", "first_addition.add"]}, + included_first_outputs={"first_addition"}, + included_second_outputs={"first_addition", "double"}, + ) + + results = pair.run({"first_addition": {"value": 10}}) + assert results == { + "first": {"first_addition": {"result": 20}, "second_addition": {"result": 140}}, + "second": {"first_addition": {"result": 40}, "double": {"value": 80}, "second_addition": {"result": 82}}, + } + + +def test_pipeline_pair_run_second_extra_inputs(first_pipeline, second_pipeline): + pair = PipelinePair( + first=first_pipeline, + second=second_pipeline, + outputs_to_inputs={"first_addition.result": ["first_addition.value"]}, + included_first_outputs={"first_addition"}, + included_second_outputs={"first_addition", "double"}, + ) + + results = pair.run( + first_inputs={"first_addition": {"value": 1}}, + second_inputs={"first_addition": {"add": 10}, "second_addition": {"add": 100}}, + ) + assert results == { + "first": {"first_addition": {"result": 11}, "second_addition": {"result": 122}}, + "second": {"first_addition": {"result": 21}, "double": {"value": 42}, "second_addition": {"result": 142}}, + } + + +def test_pipeline_pair_run_invalid_second_extra_inputs(first_pipeline, second_pipeline): + pair = PipelinePair( + first=first_pipeline, + second=second_pipeline, + outputs_to_inputs={"first_addition.result": ["first_addition.value"]}, + included_first_outputs={"first_addition"}, + included_second_outputs={"first_addition", "double"}, + ) + + with pytest.raises( + ValueError, match="Second pipeline input .* cannot be provided both explicitly and by the first pipeline" + ): + results = pair.run( + first_inputs={"first_addition": {"value": 1}}, second_inputs={"first_addition": {"value": 10}} + ) + + +def test_pipeline_pair_run_map_first_outputs(first_pipeline, second_pipeline): + pair = PipelinePair( + first=first_pipeline, + second=second_pipeline, + outputs_to_inputs={"first_addition.result": ["first_addition.value"]}, + included_first_outputs={"first_addition"}, + included_second_outputs={"double"}, + map_first_outputs=lambda x: {"first_addition": {"result": 0}, "second_addition": {"result": 0}}, + ) + + results = pair.run({"first_addition": {"value": 1}}) + assert results == { + "first": {"first_addition": {"result": 0}, "second_addition": {"result": 0}}, + "second": {"double": {"value": 2}, "second_addition": {"result": 4}}, + } + + +def test_pipeline_pair_run_first_as_batch(first_pipeline, second_pipeline_batched): + pair = PipelinePair( + first=first_pipeline, + second=second_pipeline_batched, + outputs_to_inputs={"second_addition.result": ["first_addition.value"]}, + included_first_outputs={"first_addition"}, + included_second_outputs={"first_addition", "double"}, + map_first_outputs=lambda x: aggregate_batched_pipeline_outputs(x), + ) + + results = pair.run_first_as_batch([{"first_addition": {"value": i}} for i in range(5)]) + assert results == { + "first": { + "first_addition": {"result": [10, 11, 12, 13, 14]}, + "second_addition": {"result": [120, 122, 124, 126, 128]}, + }, + "second": { + "first_addition": {"result": [121, 123, 125, 127, 129]}, + "double": {"value": [242, 246, 250, 254, 258]}, + "second_addition": {"result": [244, 248, 252, 256, 260]}, + }, + } + + +def test_pipeline_pair_run_first_as_batch_invalid_map_first_outputs(first_pipeline, second_pipeline_batched): + pair = PipelinePair( + first=first_pipeline, + second=second_pipeline_batched, + outputs_to_inputs={"second_addition.result": ["first_addition.value"]}, + included_first_outputs={"first_addition"}, + included_second_outputs={"first_addition", "double"}, + map_first_outputs=None, + ) + + with pytest.raises(ValueError, match="Mapping function for first pipeline outputs must be provided"): + results = pair.run_first_as_batch([{"first_addition": {"value": i}} for i in range(5)]) + + pair = PipelinePair( + first=first_pipeline, + second=second_pipeline_batched, + outputs_to_inputs={"second_addition.result": ["first_addition.value"]}, + included_first_outputs={"first_addition"}, + included_second_outputs={"first_addition", "double"}, + map_first_outputs=lambda x: x, + ) + + with pytest.raises(ValueError, match="Mapping function must return an aggregate dictionary"): + results = pair.run_first_as_batch([{"first_addition": {"value": i}} for i in range(5)]) diff --git a/test/evaluation/test_util.py b/test/evaluation/test_util.py new file mode 100644 index 00000000..e6d80d89 --- /dev/null +++ b/test/evaluation/test_util.py @@ -0,0 +1,54 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 +import pytest + +from haystack_experimental.evaluation.util import ( + aggregate_batched_pipeline_outputs, + deaggregate_batched_pipeline_inputs, +) + + +def test_aggregate_batched_pipeline_outputs_empty(): + assert aggregate_batched_pipeline_outputs([]) == {} + + +def test_aggregate_batched_pipeline_outputs_single(): + assert aggregate_batched_pipeline_outputs([{"a": {"b": [1, 2]}}]) == {"a": {"b": [1, 2]}} + + +def test_aggregate_batched_pipeline_outputs_multiple(): + outputs = [{"a": {"b": [1, 2], "c": [10, 20]}}, {"a": {"b": [3, 4], "c": [30, 40]}}] + assert aggregate_batched_pipeline_outputs(outputs) == {"a": {"b": [[1, 2], [3, 4]], "c": [[10, 20], [30, 40]]}} + + +def test_aggregate_batched_pipeline_outputs_mismatched_components(): + outputs = [{"a": {"b": [1, 2]}}, {"c": {"b": [3, 4]}}] + with pytest.raises(ValueError, match="Expected components .* but got"): + aggregate_batched_pipeline_outputs(outputs) + + +def test_aggregate_batched_pipeline_outputs_mismatched_component_outputs(): + outputs = [{"a": {"b": [1, 2]}}, {"a": {"b": [3, 4], "c": [5, 6]}}] + with pytest.raises(ValueError, match="Expected outputs from component .* to have keys .* but got"): + aggregate_batched_pipeline_outputs(outputs) + + +def test_deaggregate_batched_pipeline_inputs_empty(): + assert deaggregate_batched_pipeline_inputs({}) == [] + + +def test_deaggregate_batched_pipeline_inputs_single(): + inputs = {"a": {"b": [1, 2]}} + assert deaggregate_batched_pipeline_inputs(inputs) == [{"a": {"b": 1}}, {"a": {"b": 2}}] + + +def test_deaggregate_batched_pipeline_inputs_multiple(): + inputs = {"a": {"b": [1, 2], "c": [10, 20]}} + assert deaggregate_batched_pipeline_inputs(inputs) == [{"a": {"b": 1, "c": 10}}, {"a": {"b": 2, "c": 20}}] + + +def test_deaggregate_batched_pipeline_inputs_shape_mismatch(): + inputs = {"a": {"b": [1, 2]}, "c": {"b": [3]}} + with pytest.raises(ValueError, match="Expected input .* to have *. values but got"): + deaggregate_batched_pipeline_inputs(inputs) From bdf1fce04c861a8d6f0a33370ed6a5e180552d40 Mon Sep 17 00:00:00 2001 From: shadeMe Date: Fri, 17 May 2024 11:03:02 +0200 Subject: [PATCH 2/7] Make `PipelinePair._split_input_output_path` a static method --- haystack_experimental/evaluation/pipeline_pair.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/haystack_experimental/evaluation/pipeline_pair.py b/haystack_experimental/evaluation/pipeline_pair.py index 2a3e0dcd..18075867 100644 --- a/haystack_experimental/evaluation/pipeline_pair.py +++ b/haystack_experimental/evaluation/pipeline_pair.py @@ -82,7 +82,8 @@ def _validate_second_inputs(self, inputs: Dict[str, Dict[str, Any]]): f"Second pipeline input '{component_name}.{input_name}' cannot be provided both explicitly and by the first pipeline." ) - def _split_input_output_path(self, path: str) -> Tuple[str, str]: + @staticmethod + def _split_input_output_path(path: str) -> Tuple[str, str]: # Split the input/output path into component name and input/output name. pos = path.find(".") if pos == -1: From 78e81bb1d042019397cccbdf2dac68bfb88ec2d3 Mon Sep 17 00:00:00 2001 From: shadeMe Date: Fri, 17 May 2024 11:34:31 +0200 Subject: [PATCH 3/7] Clarify what `_prepare_reqd_outputs_for_first_pipeline` returns as its output --- haystack_experimental/evaluation/pipeline_pair.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/haystack_experimental/evaluation/pipeline_pair.py b/haystack_experimental/evaluation/pipeline_pair.py index 18075867..dd73f75f 100644 --- a/haystack_experimental/evaluation/pipeline_pair.py +++ b/haystack_experimental/evaluation/pipeline_pair.py @@ -94,7 +94,10 @@ def _split_input_output_path(path: str) -> Tuple[str, str]: def _prepare_reqd_outputs_for_first_pipeline(self) -> Set[str]: # To ensure that we have all the outputs from the first - # pipeline that are required by the second pipeline. + # pipeline that are required by the second pipeline, we + # collect first collect all the keys in the first-to-second + # output-to-input mapping and then add the explicitly included + # first pipeline outputs. first_components_with_outputs = {self._split_input_output_path(p)[0] for p in self.outputs_to_inputs.keys()} if self.included_first_outputs is not None: first_components_with_outputs = first_components_with_outputs.union(self.included_first_outputs) From 4473cb3826da314b43eb004eedd56c56bc0e29bd Mon Sep 17 00:00:00 2001 From: shadeMe Date: Wed, 22 May 2024 11:30:04 +0200 Subject: [PATCH 4/7] Add license header --- haystack_experimental/evaluation/pipeline_pair.py | 4 ++++ haystack_experimental/evaluation/util.py | 4 ++++ haystack_experimental/testing/sample_components.py | 4 ++++ 3 files changed, 12 insertions(+) diff --git a/haystack_experimental/evaluation/pipeline_pair.py b/haystack_experimental/evaluation/pipeline_pair.py index dd73f75f..c3148ddd 100644 --- a/haystack_experimental/evaluation/pipeline_pair.py +++ b/haystack_experimental/evaluation/pipeline_pair.py @@ -1,3 +1,7 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + from dataclasses import dataclass from typing import Any, Callable, Dict, List, Optional, Set, Tuple diff --git a/haystack_experimental/evaluation/util.py b/haystack_experimental/evaluation/util.py index 59216dd6..3459ec08 100644 --- a/haystack_experimental/evaluation/util.py +++ b/haystack_experimental/evaluation/util.py @@ -1,3 +1,7 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + from copy import deepcopy from typing import Any, Dict, List diff --git a/haystack_experimental/testing/sample_components.py b/haystack_experimental/testing/sample_components.py index b10d73ce..68689414 100644 --- a/haystack_experimental/testing/sample_components.py +++ b/haystack_experimental/testing/sample_components.py @@ -1,3 +1,7 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + from typing import Optional, List from haystack.core.component import component From 094ade852d72d0ef08ab9c8f65054ba51137ba57 Mon Sep 17 00:00:00 2001 From: shadeMe Date: Wed, 22 May 2024 11:44:41 +0200 Subject: [PATCH 5/7] Disable superfluous lints --- pyproject.toml | 27 +++++++++++---------------- 1 file changed, 11 insertions(+), 16 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 547d69d7..01ee888b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,7 +7,7 @@ name = "haystack-experimental" dynamic = ["version"] description = "Experimental components and features for the Haystack LLM framework." readme = "README.md" -license = {text = "Apache-2.0"} +license = { text = "Apache-2.0" } requires-python = ">=3.8" authors = [{ name = "deepset.ai", email = "info@deepset.ai" }] classifiers = [ @@ -25,9 +25,7 @@ classifiers = [ "Programming Language :: Python :: 3.12", "Topic :: Scientific/Engineering :: Artificial Intelligence", ] -dependencies = [ - "haystack-ai", -] +dependencies = ["haystack-ai"] [project.urls] "CI: GitHub" = "https://github.com/deepset-ai/haystack-experimental/actions" "GitHub: issues" = "https://github.com/deepset-ai/haystack-experimental/issues" @@ -60,17 +58,11 @@ lint = [ "pylint -ry -j 0 {args:haystack_experimental}", ] test-cov = "coverage run -m pytest {args:test}" -cov-report = [ - "- coverage combine", - "coverage xml", -] -cov = [ - "test-cov", - "cov-report", -] +cov-report = ["- coverage combine", "coverage xml"] +cov = ["test-cov", "cov-report"] [tool.hatch.envs.readme] -detached = true # To avoid installing the dependencies from the default environment +detached = true # To avoid installing the dependencies from the default environment dependencies = ["haystack-pydoc-tools"] [tool.hatch.envs.readme.scripts] @@ -99,15 +91,18 @@ ignore-paths = [ "haystack_experimental/__init__.py", "haystack_experimental/version.py", ] + [tool.pylint.'MESSAGES CONTROL'] max-line-length = 120 +disable = [ + "C0114", # missing-module-docstring + "R0903", # too-few-public-methods +] [tool.pytest.ini_options] minversion = "6.0" addopts = "--strict-markers" -markers = [ - "integration: integration tests", -] +markers = ["integration: integration tests"] log_cli = true [tool.mypy] From bd689ec83e7e5b16cb832c073695aa6fb145686b Mon Sep 17 00:00:00 2001 From: shadeMe Date: Wed, 22 May 2024 11:44:48 +0200 Subject: [PATCH 6/7] Fix lints --- .../evaluation/pipeline_pair.py | 36 ++++++++++--------- haystack_experimental/evaluation/util.py | 23 ++++++------ .../testing/sample_components.py | 3 +- 3 files changed, 34 insertions(+), 28 deletions(-) diff --git a/haystack_experimental/evaluation/pipeline_pair.py b/haystack_experimental/evaluation/pipeline_pair.py index c3148ddd..3a30c5ba 100644 --- a/haystack_experimental/evaluation/pipeline_pair.py +++ b/haystack_experimental/evaluation/pipeline_pair.py @@ -11,8 +11,7 @@ @dataclass(frozen=True) class PipelinePair: """ - A pair of pipelines that are linked together and - executed sequentially. + A pair of pipelines that are linked together and executed sequentially. :param first: The first pipeline in the sequence. @@ -52,7 +51,7 @@ def __post_init__(self): first_comp_name, first_out_name = self._split_input_output_path(first_out) if first_comp_name not in first_outputs: raise ValueError(f"Output component '{first_comp_name}' not found in first pipeline.") - elif first_out_name not in first_outputs[first_comp_name]: + if first_out_name not in first_outputs[first_comp_name]: raise ValueError( f"Component '{first_comp_name}' in first pipeline does not have expected output '{first_out_name}'." ) @@ -66,9 +65,10 @@ def __post_init__(self): second_comp_name, second_input_name = self._split_input_output_path(second_in) if second_comp_name not in second_inputs: raise ValueError(f"Input component '{second_comp_name}' not found in second pipeline.") - elif second_input_name not in second_inputs[second_comp_name]: + if second_input_name not in second_inputs[second_comp_name]: raise ValueError( - f"Component '{second_comp_name}' in second pipeline does not have expected input '{second_input_name}'." + f"Component '{second_comp_name}' in second pipeline " + f"does not have expected input '{second_input_name}'." ) seen_second_inputs.add(second_in) @@ -81,9 +81,10 @@ def _validate_second_inputs(self, inputs: Dict[str, Dict[str, Any]]): provided_input = inputs.get(component_name) if provided_input is None: continue - elif input_name in provided_input: + if input_name in provided_input: raise ValueError( - f"Second pipeline input '{component_name}.{input_name}' cannot be provided both explicitly and by the first pipeline." + f"Second pipeline input '{component_name}.{input_name}' cannot " + "be provided both explicitly and by the first pipeline." ) @staticmethod @@ -92,7 +93,8 @@ def _split_input_output_path(path: str) -> Tuple[str, str]: pos = path.find(".") if pos == -1: raise ValueError( - f"Invalid pipeline i/o path specifier '{path}' - Must be in the following format: ." + f"Invalid pipeline i/o path specifier '{path}' - Must be " + "in the following format: ." ) return path[:pos], path[pos + 1 :] @@ -135,10 +137,11 @@ def run( self, first_inputs: Dict[str, Dict[str, Any]], second_inputs: Optional[Dict[str, Dict[str, Any]]] = None ) -> Dict[str, Dict[str, Any]]: """ - Execute the pipeline pair by invoking first pipeline - and then the second with the outputs of the former. This - assumes that both pipelines have the same input modality, - i.e., the shapes of the first pipeline's outputs match the + Execute the pipeline pair in sequence. + + Invokes the first pipeline and then the second with the outputs + of the former. This assumes that both pipelines have the same input + modality, i.e., the shapes of the first pipeline's outputs match the shapes of the second pipeline's inputs. :param first_inputs: @@ -167,10 +170,11 @@ def run_first_as_batch( self, first_inputs: List[Dict[str, Dict[str, Any]]], second_inputs: Optional[Dict[str, Dict[str, Any]]] = None ) -> Dict[str, Dict[str, Any]]: """ - Execute the pipeline pair by invoking the first pipeline - iteratively over the list of inputs and passing the cumulative - outputs to the second pipeline. This is suitable when the first - pipeline has a single logical input-to-output mapping and the + Execute the pipeline pair in sequence. + + Invokes the first pipeline iteratively over the list of inputs and + passing the cumulative outputs to the second pipeline. This is suitable + when the first pipeline has a single logical input-to-output mapping and the second pipeline expects multiple logical inputs, e.g: a retrieval pipeline that accepts a single query and returns a list of documents and an evaluation pipeline that accepts multiple lists of documents diff --git a/haystack_experimental/evaluation/util.py b/haystack_experimental/evaluation/util.py index 3459ec08..83efb535 100644 --- a/haystack_experimental/evaluation/util.py +++ b/haystack_experimental/evaluation/util.py @@ -8,9 +8,9 @@ def aggregate_batched_pipeline_outputs(outputs: List[Dict[str, Dict[str, Any]]]) -> Dict[str, Dict[str, Any]]: """ - Combine the outputs of a pipeline that has been executed - iteratively over a batch of inputs. It performs a transpose - operation on the first and the third dimensions of the outputs. + Combine the outputs of a pipeline that has been executed iteratively over a batch of inputs. + + Performs a transpose operation on the first and the third dimensions of the outputs. :param outputs: A list of outputs from the pipeline, where each output @@ -24,7 +24,7 @@ def aggregate_batched_pipeline_outputs(outputs: List[Dict[str, Dict[str, Any]]]) # the batch input. if len(outputs) == 0: return {} - elif len(outputs) == 1: + if len(outputs) == 1: return outputs[0] # We'll use the first output as a sentinel to determine @@ -32,13 +32,14 @@ def aggregate_batched_pipeline_outputs(outputs: List[Dict[str, Dict[str, Any]]]) sentinel = outputs[0] for output in outputs[1:]: if output.keys() != sentinel.keys(): - raise ValueError(f"Expected components '{list(sentinel.keys())}' but got '{list(output.keys())}'") + raise ValueError(f"Expected components '{list(sentinel.keys())}' " f"but got '{list(output.keys())}'") for component_name, expected in sentinel.items(): got = output[component_name] if got.keys() != expected.keys(): raise ValueError( - f"Expected outputs from component '{component_name}' to have keys '{list(expected.keys())}' but got '{list(got.keys())}'" + f"Expected outputs from component '{component_name}' to have " + f"keys '{list(expected.keys())}' but got '{list(got.keys())}'" ) # The outputs are of the correct/same shape. Now to transpose @@ -58,10 +59,9 @@ def aggregate_batched_pipeline_outputs(outputs: List[Dict[str, Dict[str, Any]]]) def deaggregate_batched_pipeline_inputs(inputs: Dict[str, Dict[str, List[Any]]]) -> List[Dict[str, Dict[str, Any]]]: """ - Separate the inputs of a pipeline that has been batched along - its innermost dimension (component -> input -> values). It - performs a transpose operation on the first and the third dimensions - of the inputs. + Separate the inputs of a pipeline that has been batched along its innermost dimension. + + Performs a transpose operation on the first and the third dimensions of the inputs. :param inputs: A dictionary of pipeline inputs that maps @@ -79,7 +79,8 @@ def deaggregate_batched_pipeline_inputs(inputs: Dict[str, Dict[str, List[Any]]]) for input_name, input_values in component_inputs.items(): if len(input_values) != len(sentinel): raise ValueError( - f"Expected input '{component_name}.{input_name}' to have {len(sentinel)} values but got {len(input_values)}" + f"Expected input '{component_name}.{input_name}' to have " + f"{len(sentinel)} values but got {len(input_values)}" ) proto = {k: {k_h: None for k_h in v.keys()} for k, v in inputs.items()} diff --git a/haystack_experimental/testing/sample_components.py b/haystack_experimental/testing/sample_components.py index 68689414..60e8d33a 100644 --- a/haystack_experimental/testing/sample_components.py +++ b/haystack_experimental/testing/sample_components.py @@ -2,7 +2,8 @@ # # SPDX-License-Identifier: Apache-2.0 -from typing import Optional, List +from typing import List, Optional + from haystack.core.component import component From 955377ff36e80780f17996f87aea0ea83aeb4127 Mon Sep 17 00:00:00 2001 From: shadeMe Date: Wed, 22 May 2024 11:47:39 +0200 Subject: [PATCH 7/7] `mypy` fix --- haystack_experimental/evaluation/util.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/haystack_experimental/evaluation/util.py b/haystack_experimental/evaluation/util.py index 83efb535..fe48516b 100644 --- a/haystack_experimental/evaluation/util.py +++ b/haystack_experimental/evaluation/util.py @@ -72,8 +72,10 @@ def deaggregate_batched_pipeline_inputs(inputs: Dict[str, Dict[str, List[Any]]]) if len(inputs) == 0: return [] - sentinel = next(iter(inputs.values())) # First component's inputs - sentinel = next(iter(sentinel.values())) # First component's first input's values + # First component's inputs + sentinel = next(iter(inputs.values())) + # First component's first input's values + sentinel = next(iter(sentinel.values())) # type: ignore for component_name, component_inputs in inputs.items(): for input_name, input_values in component_inputs.items():