Skip to content

Commit

Permalink
Merge pull request #128 from sushruth2003/adaptive-batch
Browse files Browse the repository at this point in the history
Better auto batching for resolve LLM calls
  • Loading branch information
shreyashankar authored Oct 28, 2024
2 parents 404dbdd + aab2afb commit 32cd39d
Show file tree
Hide file tree
Showing 48 changed files with 2,633 additions and 1,634 deletions.
17 changes: 16 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# DocETL: Powering Complex Document Processing Pipelines

[Website (Includes Demo)](https://docetl.com) | [Documentation](https://ucbepic.github.io/docetl) | [Discord](https://discord.gg/fHp7B2X3xx) | [NotebookLM Podcast](https://notebooklm.google.com/notebook/ef73248b-5a43-49cd-9976-432d20f9fa4f/audio?pli=1) (thanks Shabie from our Discord community!) | Paper (coming soon!)
[![Website](https://img.shields.io/badge/Website-docetl.org-blue)](https://docetl.org)
[![Documentation](https://img.shields.io/badge/Documentation-docs-green)](https://ucbepic.github.io/docetl)
[![Discord](https://img.shields.io/discord/1285485891095236608?label=Discord&logo=discord)](https://discord.gg/fHp7B2X3xx)
[![Paper](https://img.shields.io/badge/Paper-arXiv-red)](https://arxiv.org/abs/2410.12189)

![DocETL Figure](docs/assets/readmefig.png)

Expand All @@ -16,6 +19,18 @@ DocETL is the ideal choice when you're looking to maximize correctness and outpu
- You're working with long documents that don't fit into a single prompt or are too lengthy for effective LLM reasoning
- You have validation criteria and want tasks to automatically retry when the validation fails

## Cool Things People Are Doing with DocETL

- [Conversation Generator](https://github.com/redhog/docetl-conversation)
- [Text-to-speech](https://github.com/redhog/docetl-speaker)
- [YouTube Transcript Topic Extraction](https://github.com/rajib76/docetl_examples)

## (Educational) Threads

- [UI/UX Thoughts](https://x.com/sh_reya/status/1846235904664273201)
- [Using Gleaning to Improve Output Quality](https://x.com/sh_reya/status/1843354256335876262)
- [Deep Dive on Resolve Operator](https://x.com/sh_reya/status/1840796824636121288)

## Installation

See the documentation for installing from PyPI.
Expand Down
33 changes: 19 additions & 14 deletions docetl/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,17 @@
FilterOp,
GatherOp,
MapOp,
ReduceOp,
ResolveOp,
SplitOp,
UnnestOp,
ClusterOp,
SampleOp,
OpType,
ParallelMapOp,
ParsingTool,
)
from docetl.schemas import (
PipelineOutput,
PipelineStep,
ReduceOp,
ResolveOp,
SplitOp,
UnnestOp,
)


Expand Down Expand Up @@ -197,17 +197,13 @@ def optimize(
Pipeline: An optimized version of the pipeline.
"""
config = self._to_dict()
optimizer = Optimizer(
runner = DSLRunner(
config,
base_name=os.path.join(os.getcwd(), self.name),
yaml_file_suffix=self.name,
max_threads=max_threads,
model=model,
timeout=timeout,
resume=resume,
)
optimizer.optimize()
optimized_config = optimizer.clean_optimized_config()
optimized_config = runner.optimize(return_pipeline=False)

updated_pipeline = Pipeline(
name=self.name,
Expand All @@ -232,8 +228,13 @@ def run(self, max_threads: Optional[int] = None) -> float:
float: The total cost of running the pipeline.
"""
config = self._to_dict()
runner = DSLRunner(config, max_threads=max_threads)
result = runner.run()
runner = DSLRunner(
config,
base_name=os.path.join(os.getcwd(), self.name),
yaml_file_suffix=self.name,
max_threads=max_threads,
)
result = runner.load_run_save()
return result

def to_yaml(self, path: str) -> None:
Expand Down Expand Up @@ -322,6 +323,10 @@ def _update_from_dict(self, config: Dict[str, Any]):
self.operations.append(GatherOp(**op, type=op_type))
elif op_type == "unnest":
self.operations.append(UnnestOp(**op, type=op_type))
elif op_type == "cluster":
self.operations.append(ClusterOp(**op, type=op_type))
elif op_type == "sample":
self.operations.append(SampleOp(**op, type=op_type))
self.steps = [PipelineStep(**step) for step in config["pipeline"]["steps"]]
self.output = PipelineOutput(**config["pipeline"]["output"])
self.default_model = config.get("default_model")
Expand Down
128 changes: 128 additions & 0 deletions docetl/base_schemas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
from typing import Any, Dict, List, Optional, Union

from pydantic import BaseModel, Field, field_validator

# from ..operations import map
# MapOp = map.MapOperation.schema

class ToolFunction(BaseModel):
name: str
description: str
parameters: Dict[str, Any]


class Tool(BaseModel):
code: str
function: ToolFunction


class ParsingTool(BaseModel):
"""
Represents a parsing tool used for custom data parsing in the pipeline.
Attributes:
name (str): The name of the parsing tool. This should be unique within the pipeline configuration.
function_code (str): The Python code defining the parsing function. This code will be executed
to parse the input data according to the specified logic. It should return a list of strings, where each string is its own document.
Example:
```yaml
parsing_tools:
- name: ocr_parser
function_code: |
import pytesseract
from pdf2image import convert_from_path
def ocr_parser(filename: str) -> List[str]:
images = convert_from_path(filename)
text = ""
for image in images:
text += pytesseract.image_to_string(image)
return [text]
```
"""

name: str
function_code: str


class PipelineStep(BaseModel):
"""
Represents a step in the pipeline.
Attributes:
name (str): The name of the step.
operations (List[Union[Dict[str, Any], str]]): A list of operations to be applied in this step.
Each operation can be either a string (the name of the operation) or a dictionary
(for more complex configurations).
input (Optional[str]): The input for this step. It can be either the name of a dataset
or the name of a previous step. If not provided, the step will use the output
of the previous step as its input.
Example:
```python
# Simple step with a single operation
process_step = PipelineStep(
name="process_step",
input="my_dataset",
operations=["process"]
)
# Step with multiple operations
summarize_step = PipelineStep(
name="summarize_step",
input="process_step",
operations=["summarize"]
)
# Step with a more complex operation configuration
custom_step = PipelineStep(
name="custom_step",
input="previous_step",
operations=[
{
"custom_operation": {
"model": "gpt-4",
"prompt": "Perform a custom analysis on the following text:"
}
}
]
)
```
These examples show different ways to configure pipeline steps, from simple
single-operation steps to more complex configurations with custom parameters.
"""

name: str
operations: List[Union[Dict[str, Any], str]]
input: Optional[str] = None

class PipelineOutput(BaseModel):
"""
Represents the output configuration for a pipeline.
Attributes:
type (str): The type of output. This could be 'file', 'database', etc.
path (str): The path where the output will be stored. This could be a file path,
database connection string, etc., depending on the type.
intermediate_dir (Optional[str]): The directory to store intermediate results,
if applicable. Defaults to None.
Example:
```python
output = PipelineOutput(
type="file",
path="/path/to/output.json",
intermediate_dir="/path/to/intermediate/results"
)
```
"""

type: str
path: str
intermediate_dir: Optional[str] = None


class PipelineSpec(BaseModel):
steps: list[PipelineStep]
output: PipelineOutput
56 changes: 22 additions & 34 deletions docetl/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,26 +77,11 @@ def items(self):
return [(key, self[key]) for key in self.keys()]


class Optimizer(ConfigWrapper):
@classmethod
def from_yaml(cls, yaml_file: str, **kwargs):
# check that file ends with .yaml or .yml
if not yaml_file.endswith(".yaml") and not yaml_file.endswith(".yml"):
raise ValueError(
"Invalid file type. Please provide a YAML file ending with '.yaml' or '.yml'."
)

base_name = yaml_file.rsplit(".", 1)[0]
suffix = yaml_file.split("/")[-1].split(".")[0]
return super(Optimizer, cls).from_yaml(
yaml_file, base_name=base_name, yaml_file_suffix=suffix, **kwargs
)
class Optimizer:

def __init__(
self,
config: Dict,
base_name: str,
yaml_file_suffix: str,
runner: "DSLRunner",
max_threads: Optional[int] = None,
model: str = "gpt-4o",
resume: bool = False,
Expand Down Expand Up @@ -136,7 +121,16 @@ def __init__(
The method also calls print_optimizer_config() to display the initial configuration.
"""
ConfigWrapper.__init__(self, config, max_threads)
self.config = runner.config
self.console = runner.console
self.max_threads = runner.max_threads

self.base_name = runner.base_name
self.yaml_file_suffix = runner.yaml_file_suffix
self.config = runner.config
self.runner = runner
self.status = runner.status

self.optimized_config = copy.deepcopy(self.config)
self.llm_client = LLMClient(model)
self.operations_cost = 0
Expand All @@ -145,17 +139,11 @@ def __init__(
self.samples_taken = defaultdict(dict)
self.resume = resume

# create parsing tool map
self.parsing_tool_map = create_parsing_tool_map(
self.config.get("parsing_tools", None)
)

home_dir = os.path.expanduser("~")
cache_dir = os.path.join(home_dir, f".docetl/cache/{yaml_file_suffix}")
cache_dir = os.path.join(home_dir, f".docetl/cache/{runner.yaml_file_suffix}")
os.makedirs(cache_dir, exist_ok=True)
self.datasets = DatasetOnDisk(dir=cache_dir, console=self.console)
self.optimized_ops_path = f"{cache_dir}/optimized_ops"
self.optimized_config_path = f"{base_name}_opt.yaml"

# Update sample size map
self.sample_size_map = SAMPLE_SIZE_MAP
Expand Down Expand Up @@ -191,7 +179,7 @@ def syntax_check(self):
try:
operation_class = get_operation(operation_type)
operation_class(
self,
self.runner,
operation_config,
self.config.get("default_model", "gpt-4o-mini"),
self.max_threads,
Expand Down Expand Up @@ -976,7 +964,7 @@ def _get_sample_data(
type=dataset_config["type"],
path_or_data=dataset_config["path"],
parsing=dataset_config.get("parsing", []),
user_defined_parsing_tool_map=self.parsing_tool_map,
user_defined_parsing_tool_map=self.runner.parsing_tool_map,
)
data = dataset.load()

Expand Down Expand Up @@ -1115,7 +1103,7 @@ def _optimize_reduce(
List[Dict[str, Any]]: The optimized operation configuration.
"""
reduce_optimizer = ReduceOptimizer(
self,
self.runner,
self.config,
self.console,
self.llm_client,
Expand Down Expand Up @@ -1158,7 +1146,7 @@ def _optimize_equijoin(
new_right_name = right_name
for _ in range(max_iterations):
join_optimizer = JoinOptimizer(
self,
self.runner,
self.config,
op_config,
self.console,
Expand Down Expand Up @@ -1310,7 +1298,7 @@ def _optimize_resolve(
List[Dict[str, Any]]: The optimized operation configuration.
"""
optimized_config, cost = JoinOptimizer(
self,
self.runner,
self.config,
op_config,
self.console,
Expand Down Expand Up @@ -1358,7 +1346,7 @@ def _run_operation(
operation_class = get_operation(op_config["type"])

oc_kwargs = {
"runner": self,
"runner": self.runner,
"config": op_config,
"default_model": self.config["default_model"],
"max_threads": self.max_threads,
Expand Down Expand Up @@ -1418,7 +1406,7 @@ def clean_optimized_config(self):

return resolved_config

def save_optimized_config(self):
def save_optimized_config(self, optimized_config_path: str):
"""
Save the optimized configuration to a YAML file.
Expand All @@ -1427,10 +1415,10 @@ def save_optimized_config(self):
"""
resolved_config = self.clean_optimized_config()

with open(self.optimized_config_path, "w") as f:
with open(optimized_config_path, "w") as f:
yaml.safe_dump(resolved_config, f, default_flow_style=False, width=80)
self.console.log(
f"[green italic]💾 Optimized config saved to {self.optimized_config_path}[/green italic]"
f"[green italic]💾 Optimized config saved to {optimized_config_path}[/green italic]"
)


Expand Down
Loading

0 comments on commit 32cd39d

Please sign in to comment.