diff --git a/docetl/dataset.py b/docetl/dataset.py index c3be07b5..9d53753c 100644 --- a/docetl/dataset.py +++ b/docetl/dataset.py @@ -148,20 +148,14 @@ def _validate_parsing( for tool in parsing_tools: if ( not isinstance(tool, dict) - or "input_key" not in tool or "function" not in tool - or "output_key" not in tool ): raise ValueError( - "Each parsing tool must be a dictionary with 'input_key', 'function', and 'output_key' keys" + "Each parsing tool must be a dictionary with a 'function' key and any arguments required by that function" ) - if ( - not isinstance(tool["input_key"], str) - or not isinstance(tool["function"], str) - or not isinstance(tool["output_key"], str) - ): + if not isinstance(tool["function"], str): raise ValueError( - "'input_key', 'function', and 'output_key' in parsing tools must be strings" + "'function' in parsing tools must be a string" ) if "function_kwargs" in tool and not isinstance( tool["function_kwargs"], dict @@ -213,19 +207,12 @@ def load(self) -> List[Dict]: def _process_item( self, item: Dict[str, Any], - input_key: str, - output_key: str, func: Callable, **function_kwargs: Dict[str, Any], ): - if input_key not in item: - raise ValueError(f"Input key {input_key} not found in item: {item}") - result = func(item[input_key], **function_kwargs) - if isinstance(result, list): - return [item.copy() | {output_key: res} for res in result] - else: - return [item | {output_key: result}] - + result = func(item, **function_kwargs) + return [item.copy() | res for res in result] + def _apply_parsing_tools(self, data: List[Dict]) -> List[Dict]: """ Apply parsing tools to the data. @@ -240,7 +227,13 @@ def _apply_parsing_tools(self, data: List[Dict]) -> List[Dict]: ValueError: If a parsing tool is not found or if an input key is missing from an item. """ for tool in self.parsing: - input_key = tool["input_key"] + function_kwargs = dict(tool) + function_kwargs.pop("function") + # FIXME: The following is just for backwards compatibility + # with the existing yaml format... + if "function_kwargs" in function_kwargs: + function_kwargs.update(function_kwargs.pop("function_kwargs")) + try: func = get_parser(tool["function"]) except KeyError: @@ -261,8 +254,6 @@ def _apply_parsing_tools(self, data: List[Dict]) -> List[Dict]: f"Parsing tool {tool['function']} not found. Please define it or use one of our existing parsing tools: {get_parsing_tools()}" ) - output_key = tool["output_key"] - function_kwargs = tool.get("function_kwargs", {}) new_data = [] with ThreadPoolExecutor() as executor: @@ -270,8 +261,6 @@ def _apply_parsing_tools(self, data: List[Dict]) -> List[Dict]: executor.submit( self._process_item, item, - input_key, - output_key, func, **function_kwargs, ) diff --git a/docetl/operations/map.py b/docetl/operations/map.py index ed9dd4e1..3f077713 100644 --- a/docetl/operations/map.py +++ b/docetl/operations/map.py @@ -171,6 +171,7 @@ def validation_fn(response: Dict[str, Any]): max_retries_per_timeout=self.config.get( "max_retries_per_timeout", 2 ), + verbose=self.config.get("verbose", False), ), validation_fn=validation_fn, val_rule=self.config.get("validate", []), diff --git a/docetl/operations/reduce.py b/docetl/operations/reduce.py index 30d2373b..b4091865 100644 --- a/docetl/operations/reduce.py +++ b/docetl/operations/reduce.py @@ -273,6 +273,11 @@ def execute(self, input_data: List[Dict]) -> Tuple[List[Dict], float]: Returns: Tuple[List[Dict], float]: A tuple containing the processed results and the total cost of the operation. """ + if self.config.get("gleaning", {}).get("validation_prompt", None): + self.console.log( + f"Using gleaning with validation prompt: {self.config.get('gleaning', {}).get('validation_prompt', '')}" + ) + reduce_keys = self.config["reduce_key"] if isinstance(reduce_keys, str): reduce_keys = [reduce_keys] @@ -860,6 +865,7 @@ def _batch_reduce( console=self.console, timeout_seconds=self.config.get("timeout", 120), max_retries_per_timeout=self.config.get("max_retries_per_timeout", 2), + verbose=self.config.get("verbose", False), ) item_cost += gleaning_cost else: diff --git a/docetl/operations/split.py b/docetl/operations/split.py index 78444b95..8b9f26a3 100644 --- a/docetl/operations/split.py +++ b/docetl/operations/split.py @@ -51,9 +51,15 @@ def execute(self, input_data: List[Dict]) -> Tuple[List[Dict], float]: split_key = self.config["split_key"] method = self.config["method"] method_kwargs = self.config["method_kwargs"] - encoder = tiktoken.encoding_for_model( - self.config["method_kwargs"].get("model", self.default_model).split("/")[-1] - ) + try: + encoder = tiktoken.encoding_for_model( + self.config["method_kwargs"] + .get("model", self.default_model) + .split("/")[-1] + ) + except Exception: + encoder = tiktoken.encoding_for_model("gpt-4o") + results = [] cost = 0.0 diff --git a/docetl/operations/utils.py b/docetl/operations/utils.py index 88553ea7..f4ce3c48 100644 --- a/docetl/operations/utils.py +++ b/docetl/operations/utils.py @@ -359,7 +359,11 @@ def safe_eval(expression: str, output: Dict) -> bool: # Safely evaluate the expression return bool(aeval(expression)) except Exception: - return False + # try to evaluate with python eval + try: + return bool(eval(expression, locals={"output": output})) + except Exception: + return False class APIWrapper(object): @@ -720,6 +724,7 @@ def call_llm_with_gleaning( console: Console = Console(), timeout_seconds: int = 120, max_retries_per_timeout: int = 2, + verbose: bool = False, ) -> Tuple[str, float]: """ Call LLM with a gleaning process, including validation and improvement rounds. @@ -789,7 +794,7 @@ def call_llm_with_gleaning( # Call LLM for validation self.runner.rate_limiter.try_acquire("llm_call", weight=1) validator_response = completion( - model="gpt-4o-mini", + model=model, messages=truncate_messages( messages + [{"role": "user", "content": validator_prompt}], model ), @@ -817,9 +822,10 @@ def call_llm_with_gleaning( if not suggestion["should_refine"]: break - # console.log( - # f"Validator improvements (gleaning round {rnd + 1}): {suggestion['improvements']}" - # ) + if verbose: + console.log( + f"Validator improvements (gleaning round {rnd + 1}): {suggestion['improvements']}" + ) # Prompt for improvement improvement_prompt = f"""Based on the validation feedback: @@ -1166,4 +1172,4 @@ def rich_as_completed(futures, total=None, desc=None, leave=True, console=None): with RichLoopBar(total=total, desc=desc, leave=leave, console=console) as pbar: for future in as_completed(futures): yield future - pbar.update() \ No newline at end of file + pbar.update() diff --git a/docetl/optimizers/reduce_optimizer.py b/docetl/optimizers/reduce_optimizer.py index 67727dec..84518aa1 100644 --- a/docetl/optimizers/reduce_optimizer.py +++ b/docetl/optimizers/reduce_optimizer.py @@ -1305,7 +1305,7 @@ def _calculate_compression_ratio( reduce_key = op_config["reduce_key"] input_schema = op_config.get("input", {}).get("schema", {}) output_schema = op_config["output"]["schema"] - model = op_config.get("model", "gpt-4o") + model = op_config.get("model", "gpt-4o-mini") compression_ratios = {} diff --git a/docetl/parsing_tools.py b/docetl/parsing_tools.py index 208fa07c..86014abd 100644 --- a/docetl/parsing_tools.py +++ b/docetl/parsing_tools.py @@ -1,31 +1,49 @@ import importlib import io import os -from typing import Dict, List, Optional - - -def llama_index_simple_directory_reader(filename: str) -> List[str]: +from typing import Dict, List, Optional, Any + +def with_input_output_key(fn): + """Decorator that wraps a parser function that takes a single + string parameter and return list of strings and makes it a full + parser function that takes an item as a dictionary and return a + list of dictionaries.""" + def wrapper(item, input_key="text", output_key="text", **kw): + if input_key not in item: + raise ValueError(f"Input key {input_key} not found in item: {item}") + result = fn(item[input_key], **kw) + if not isinstance(result, list): + result = [result] + return [{output_key: res} for res in result] + return wrapper + +def llama_index_simple_directory_reader(item: dict[str, Any], input_key: str ="path") -> List[dict[str, Any]]: from llama_index.core import SimpleDirectoryReader - documents = SimpleDirectoryReader(filename).load_data() - # FIXME: What about doc.metadata? Would be good to include that too... - return [doc.text for doc in documents] + documents = SimpleDirectoryReader(item[input_key]).load_data() + return [{"text": doc.text, + "metadata": doc.metadata} + for doc in documents] -def llama_index_wikipedia_reader(filename: str) -> List[str]: +def llama_index_wikipedia_reader(item: dict[str, Any], input_key: str = "pages") -> List[dict[str, Any]]: from llama_index.readers.wikipedia import WikipediaReader loader = WikipediaReader() - pages = [filename] + pages = item[input_key] + if not isinstance(pages, list): + pages = [pages] documents = loader.load_data(pages=pages, auto_suggest=False) # The wikipedia reader does not include the page url in the metadata, which is impractical... for name, doc in zip(pages, documents): doc.metadata["source"] = "https://en.wikipedia.org/wiki/" + name - # FIXME: What about doc.metadata? Would be good to include that too... - return [doc.text for doc in documents] + return [{"text": doc.text, + "metadata": doc.metadata} + for doc in documents] +@with_input_output_key def whisper_speech_to_text(filename: str) -> List[str]: """ Transcribe speech from an audio file to text using Whisper model via litellm. @@ -72,6 +90,7 @@ def whisper_speech_to_text(filename: str) -> List[str]: return [response.text] +@with_input_output_key def xlsx_to_string( filename: str, orientation: str = "col", @@ -128,6 +147,7 @@ def process_sheet(sheet): return [process_sheet(wb.active)] +@with_input_output_key def txt_to_string(filename: str) -> List[str]: """ Read the content of a text file and return it as a list of strings (only one element). @@ -142,6 +162,7 @@ def txt_to_string(filename: str) -> List[str]: return [file.read()] +@with_input_output_key def docx_to_string(filename: str) -> List[str]: """ Extract text from a Word document. @@ -158,6 +179,7 @@ def docx_to_string(filename: str) -> List[str]: return ["\n".join([paragraph.text for paragraph in doc.paragraphs])] +@with_input_output_key def pptx_to_string(filename: str, doc_per_slide: bool = False) -> List[str]: """ Extract text from a PowerPoint presentation. @@ -195,6 +217,7 @@ def pptx_to_string(filename: str, doc_per_slide: bool = False) -> List[str]: return result +@with_input_output_key def azure_di_read( filename: str, use_url: bool = False, @@ -334,6 +357,7 @@ def azure_di_read( ] +@with_input_output_key def paddleocr_pdf_to_string( input_path: str, doc_per_page: bool = False, @@ -399,6 +423,7 @@ def paddleocr_pdf_to_string( return pdf_content +@with_input_output_key def gptpdf_to_string( input_path: str, gpt_model: str, diff --git a/docetl/utils.py b/docetl/utils.py index a1336b1d..61f95379 100644 --- a/docetl/utils.py +++ b/docetl/utils.py @@ -120,7 +120,10 @@ def truncate_sample_data( remaining_tokens = available_tokens - current_tokens # Encode the value - encoder = tiktoken.encoding_for_model(model) + try: + encoder = tiktoken.encoding_for_model(model) + except Exception: + encoder = tiktoken.encoding_for_model("gpt-4o") encoded_value = encoder.encode(str(data[key])) # Calculate how many tokens to keep diff --git a/docs/concepts/operators.md b/docs/concepts/operators.md index f81653c7..fc9070ef 100644 --- a/docs/concepts/operators.md +++ b/docs/concepts/operators.md @@ -135,7 +135,7 @@ validate: - all(len(insight["supporting_actions"]) >= 1 for insight in output["insights"]) ``` -Access variables using dictionary syntax: `input["field"]` or `output["field"]`. +Access variables using dictionary syntax: `output["field"]`. Note that you can't access `input` docs in validation, but the output docs should have all the fields from the input docs (for non-reduce operations), since fields pass through unchanged. The `num_retries_on_validate_failure` attribute specifies how many times to retry the LLM if any validation statements fail. diff --git a/docs/examples/custom-parsing.md b/docs/examples/custom-parsing.md index 5f62d1ae..a8959f97 100644 --- a/docs/examples/custom-parsing.md +++ b/docs/examples/custom-parsing.md @@ -35,10 +35,11 @@ To use custom parsing, you need to define parsing tools in your DocETL configura parsing_tools: - name: top_products_report function_code: | - def top_products_report(filename: str) -> List[str]: + def top_products_report(document: Dict) -> List[Dict]: import pandas as pd # Read the Excel file + filename = document["excel_path"] df = pd.read_excel(filename) # Calculate total sales @@ -61,7 +62,10 @@ parsing_tools: mom_growth.to_string() ] - return ["\n".join(report)] + # Return a list of dicts representing the output + # The input document will be merged into each output doc, + # so we can access all original fields from the input doc. + return [{"sales_analysis": "\n".join(report)}] datasets: sales_reports: @@ -69,9 +73,7 @@ datasets: source: local path: "sales_data/sales_paths.json" parsing: - - input_key: excel_path - function: top_products_report - output_key: sales_analysis + - function: top_products_report receipts: type: file @@ -81,9 +83,8 @@ datasets: - input_key: pdf_path function: paddleocr_pdf_to_string output_key: receipt_text - function_kwargs: - ocr_enabled: true - lang: "en" + ocr_enabled: true + lang: "en" ``` In this configuration: @@ -111,8 +112,6 @@ pipeline: This pipeline will use the parsed data from both Excel files and PDFs for further processing. - - ### How Data Gets Parsed and Formatted When you run your DocETL pipeline, the parsing tools you've specified in your configuration file are applied to the external files referenced in your dataset JSONs. Here's what happens: @@ -205,45 +204,45 @@ When you run this command: DocETL provides several built-in parsing tools to handle common file formats and data processing tasks. These tools can be used directly in your configuration by specifying their names in the `function` field of your parsing tools configuration. Here's an overview of the available built-in parsing tools: ::: docetl.parsing_tools.xlsx_to_string - options: - show_root_heading: true - heading_level: 3 + options: + show_root_heading: true + heading_level: 3 ::: docetl.parsing_tools.txt_to_string - options: - show_root_heading: true - heading_level: 3 + options: + show_root_heading: true + heading_level: 3 ::: docetl.parsing_tools.docx_to_string - options: - show_root_heading: true - heading_level: 3 + options: + show_root_heading: true + heading_level: 3 ::: docetl.parsing_tools.whisper_speech_to_text - options: - show_root_heading: true - heading_level: 3 + options: + show_root_heading: true + heading_level: 3 ::: docetl.parsing_tools.pptx_to_string - options: - show_root_heading: true - heading_level: 3 + options: + show_root_heading: true + heading_level: 3 ::: docetl.parsing_tools.azure_di_read - options: - heading_level: 3 - show_root_heading: true + options: + heading_level: 3 + show_root_heading: true ::: docetl.parsing_tools.paddleocr_pdf_to_string - options: - heading_level: 3 - show_root_heading: true + options: + heading_level: 3 + show_root_heading: true ### Using Function Arguments with Parsing Tools -When using parsing tools in your DocETL configuration, you can pass additional arguments to the parsing functions using the function_kwargs field. This allows you to customize the behavior of the parsing tools without modifying their implementation. +When using parsing tools in your DocETL configuration, you can pass additional arguments to the parsing functions. -For example, when using the xlsx_to_string parsing tool, you can specify options like the orientation of the data, the order of columns, or whether to process each sheet separately. Here's an example of how to use function_kwargs in your configuration: +For example, when using the xlsx_to_string parsing tool, you can specify options like the orientation of the data, the order of columns, or whether to process each sheet separately. Here's an example of how to use such kwargs in your configuration: ```yaml datasets: @@ -254,10 +253,9 @@ datasets: parsing_tools: - name: excel_parser function: xlsx_to_string - function_kwargs: - orientation: row - col_order: ["Date", "Product", "Quantity", "Price"] - doc_per_sheet: true + orientation: row + col_order: ["Date", "Product", "Quantity", "Price"] + doc_per_sheet: true ``` ## Contributing Built-in Parsing Tools @@ -285,7 +283,7 @@ While DocETL provides several built-in parsing tools, the community can always b If the built-in tools don't meet your needs, you can create your own custom parsing tools. Here's how: 1. Define your parsing function in the `parsing_tools` section of your configuration. -2. Ensure your function takes a filename as input and returns a list of strings. +2. Ensure your function takes a document (dict) as input and returns a list of documents (dicts). 3. Use your custom parser in the `parsing` section of your dataset configuration. For example: @@ -294,7 +292,7 @@ For example: parsing_tools: - name: my_custom_parser function_code: | - def my_custom_parser(filename: str) -> List[str]: + def my_custom_parser(document: Dict) -> List[Dict]: # Your custom parsing logic here return [processed_data] @@ -304,7 +302,5 @@ datasets: source: local path: "data/paths.json" parsing: - - input_key: file_path - function: my_custom_parser - output_key: processed_data -``` \ No newline at end of file + - function: my_custom_parser +``` diff --git a/tests/basic/test_pipeline_with_parsing.py b/tests/basic/test_pipeline_with_parsing.py index bd524998..03ad15e4 100644 --- a/tests/basic/test_pipeline_with_parsing.py +++ b/tests/basic/test_pipeline_with_parsing.py @@ -1,4 +1,4 @@ -from typing import List +from typing import Dict, List import pytest import json import os @@ -129,9 +129,9 @@ def test_pipeline_with_parsing(config_file): os.remove(sample_data_file.name) -def custom_exploder(text: str) -> List[str]: - - return [t for t in text] +def custom_exploder(doc: Dict) -> List[Dict]: + text = doc["text"] + return [{"text": t} for t in text] def test_pipeline_with_custom_parsing(): @@ -160,9 +160,7 @@ def test_pipeline_with_custom_parsing(): path=tmp_input.name, parsing=[ { - "input_key": "text", "function": "custom_exploder", - "output_key": "parsed_content", } ], ) diff --git a/tests/test_api.py b/tests/test_api.py index 55b2c6b7..921c2de9 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -227,7 +227,9 @@ def test_pipeline_optimization( default_model="gpt-4o-mini", ) - optimized_pipeline = pipeline.optimize(max_threads=4, model="gpt-4o", timeout=10) + optimized_pipeline = pipeline.optimize( + max_threads=4, model="gpt-4o-mini", timeout=10 + ) assert isinstance(optimized_pipeline, Pipeline) assert len(optimized_pipeline.operations) == len(pipeline.operations) diff --git a/tests/test_validation.py b/tests/test_validation.py new file mode 100644 index 00000000..ed64af4c --- /dev/null +++ b/tests/test_validation.py @@ -0,0 +1,46 @@ +import pytest +from docetl.operations.map import MapOperation +from tests.conftest import api_wrapper, default_model, max_threads + + +@pytest.fixture +def map_config_with_validation(): + return { + "name": "sentiment_analysis_with_validation", + "type": "map", + "prompt": "Analyze the sentiment of the following text: '{{ input.text }}'. Classify it as either positive, negative, or neutral.", + "output": {"schema": {"sentiment": "string", "confidence": "float"}}, + "model": "gpt-4o-mini", + "validate": [ + "output['sentiment'] in ['positive', 'negative', 'neutral']", + "0 <= output['confidence'] <= 1", + ], + "num_retries_on_validate_failure": 2, + } + + +@pytest.fixture +def sample_data(): + return [ + {"text": "I love this product! It's amazing."}, + {"text": "This is the worst experience ever."}, + {"text": "The weather is okay today."}, + ] + + +def test_map_operation_with_validation( + map_config_with_validation, sample_data, api_wrapper, default_model, max_threads +): + operation = MapOperation( + api_wrapper, map_config_with_validation, default_model, max_threads + ) + results, cost = operation.execute(sample_data) + + assert len(results) == len(sample_data) + assert cost > 0 + + for result in results: + assert "sentiment" in result + assert "confidence" in result + assert result["sentiment"] in ["positive", "negative", "neutral"] + assert 0 <= result["confidence"] <= 1