diff --git a/docetl/operations/cluster.py b/docetl/operations/cluster.py new file mode 100644 index 00000000..09144e77 --- /dev/null +++ b/docetl/operations/cluster.py @@ -0,0 +1,206 @@ +from jinja2 import Environment, Template +from concurrent.futures import ThreadPoolExecutor +from typing import Any, Dict, List, Optional, Tuple +from .base import BaseOperation +from .utils import RichLoopBar +from .clustering_utils import get_embeddings_for_clustering + + +class ClusterOperation(BaseOperation): + def __init__( + self, + *args, + **kwargs, + ): + super().__init__(*args, **kwargs) + self.max_batch_size: int = self.config.get( + "max_batch_size", kwargs.get("max_batch_size", float("inf")) + ) + + def syntax_check(self) -> None: + """ + Checks the configuration of the ClusterOperation for required keys and valid structure. + + Raises: + ValueError: If required keys are missing or invalid in the configuration. + TypeError: If configuration values have incorrect types. + """ + required_keys = ["embedding_keys", "summary_schema", "summary_prompt"] + for key in required_keys: + if key not in self.config: + raise ValueError( + f"Missing required key '{key}' in ClusterOperation configuration" + ) + + if not isinstance(self.config["embedding_keys"], list): + raise TypeError("'embedding_keys' must be a list of strings") + + if "output_key" in self.config: + if not isinstance(self.config["output_key"], str): + raise TypeError("'output_key' must be a string") + + if not isinstance(self.config["summary_schema"], dict): + raise TypeError("'summary_schema' must be a dictionary") + + if not isinstance(self.config["summary_prompt"], str): + raise TypeError("'prompt' must be a string") + + # Check if the prompt is a valid Jinja2 template + try: + Template(self.config["summary_prompt"]) + except Exception as e: + raise ValueError(f"Invalid Jinja2 template in 'prompt': {str(e)}") + + # Check optional parameters + if "max_batch_size" in self.config: + if not isinstance(self.config["max_batch_size"], int): + raise TypeError("'max_batch_size' must be an integer") + + if "embedding_model" in self.config: + if not isinstance(self.config["embedding_model"], str): + raise TypeError("'embedding_model' must be a string") + + if "model" in self.config: + if not isinstance(self.config["model"], str): + raise TypeError("'model' must be a string") + + if "validate" in self.config: + if not isinstance(self.config["validate"], list): + raise TypeError("'validate' must be a list of strings") + for rule in self.config["validate"]: + if not isinstance(rule, str): + raise TypeError("Each validation rule must be a string") + + def execute( + self, input_data: List[Dict], is_build: bool = False + ) -> Tuple[List[Dict], float]: + """ + Executes the cluster operation on the input data. Modifies the + input data and returns it in place. + + Args: + input_data (List[Dict]): A list of dictionaries to process. + is_build (bool): Whether the operation is being executed + in the build phase. Defaults to False. + + Returns: + Tuple[List[Dict], float]: A tuple containing the clustered + list of dictionaries and the total cost of the operation. + """ + if not input_data: + return input_data, 0 + + if len(input_data) == 1: + input_data[0][self.config.get("output_key", "clusters")] = () + return input_data, 0 + + embeddings, cost = get_embeddings_for_clustering( + input_data, self.config, self.runner.api + ) + + tree = self.agglomerative_cluster_of_embeddings(input_data, embeddings) + + self.prompt_template = Template(self.config["summary_prompt"]) + cost += self.annotate_clustering_tree(tree) + self.annotate_leaves(tree) + + return input_data, cost + + def agglomerative_cluster_of_embeddings(self, input_data, embeddings): + import sklearn.cluster + + cl = sklearn.cluster.AgglomerativeClustering( + compute_full_tree=True, compute_distances=True + ) + cl.fit(embeddings) + + nsamples = len(embeddings) + + def build_tree(i): + if i < nsamples: + res = input_data[i] + # res["embedding"] = list(embeddings[i]) + return res + return { + "children": [ + build_tree(cl.children_[i - nsamples, 0]), + build_tree(cl.children_[i - nsamples, 1]), + ], + "distance": cl.distances_[i - nsamples], + } + + return build_tree(nsamples + len(cl.children_) - 1) + + def annotate_clustering_tree(self, t): + if "children" in t: + with ThreadPoolExecutor(max_workers=self.max_batch_size) as executor: + futures = [ + executor.submit(self.annotate_clustering_tree, child) + for child in t["children"] + ] + + total_cost = 0 + pbar = RichLoopBar( + range(len(futures)), + desc=f"Processing {self.config['name']} (map) on all documents", + console=self.console, + ) + for i in pbar: + total_cost += futures[i].result() + pbar.update(i) + + assert len(t["children"]) == 2, ( + "Agglomerative clustering is supposed to generate clusters with 2 children each, but this cluster has %s" + % len(t["children"]) + ) + prompt = self.prompt_template.render( + left=t["children"][0], right=t["children"][1] + ) + + def validation_fn(response: Dict[str, Any]): + output = self.runner.api.parse_llm_response( + response, + schema=self.config["summary_schema"], + manually_fix_errors=self.manually_fix_errors, + )[0] + if self.runner.api.validate_output(self.config, output, self.console): + return output, True + return output, False + + output, cost, success = self.runner.api.call_llm_with_validation( + [{"role": "user", "content": prompt}], + model=self.config.get("model", self.default_model), + operation_type="cluster", + schema=self.config["summary_schema"], + llm_call_fn=lambda messages: self.runner.api.call_llm( + self.config.get("model", self.default_model), + "cluster", + messages, + self.config["summary_schema"], + tools=self.config.get("tools", None), + console=self.console, + timeout_seconds=self.config.get("timeout", 120), + max_retries_per_timeout=self.config.get( + "max_retries_per_timeout", 2 + ), + ), + validation_fn=validation_fn, + val_rule=self.config.get("validate", []), + num_retries=self.num_retries_on_validate_failure, + console=self.console, + ) + total_cost += cost + + t.update(output) + + return total_cost + return 0 + + def annotate_leaves(self, tree, path=()): + if "children" in tree: + item = dict(tree) + item.pop("children") + for child in tree["children"]: + self.annotate_leaves(child, path=(item,) + path) + else: + tree[self.config.get("output_key", "clusters")] = path diff --git a/docs/api-reference/operations.md b/docs/api-reference/operations.md index 5d3487e7..423fc59a 100644 --- a/docs/api-reference/operations.md +++ b/docs/api-reference/operations.md @@ -54,6 +54,15 @@ ignore_init_summary: false trim_doctest_flags: true +::: docetl.operations.cluster.ClusterOperation + options: + show_root_heading: true + heading_level: 3 + show_if_no_docstring: false + docstring_options: + ignore_init_summary: false + trim_doctest_flags: true + # Auxiliary Operators ::: docetl.operations.split.SplitOperation diff --git a/docs/operators/cluster.md b/docs/operators/cluster.md new file mode 100644 index 00000000..51ef82a1 --- /dev/null +++ b/docs/operators/cluster.md @@ -0,0 +1,188 @@ +# Cluster operation + +The Cluster operation in DocETL groups all items into a binary tree +using [agglomerative +clustering](https://en.wikipedia.org/wiki/Hierarchical_clustering#Agglomerative_clustering_example) +of the embedding of some keys, and annotates each item with the path +through this tree down to the item (Note that the path is reversed, +starting with the most specific grouping, and ending in the root of +the tree, the cluster that encompasses all your input). + +Each cluster is summarized using an llm prompt, taking the summaries +of its children as inputs (or for the leaf nodes, the actual items). + +## 🚀 Example: Grouping concepts from a knowledge-graph + +```yaml +- name: cluster_concepts + type: cluster + max_batch_size: 5 + embedding_keys: + - concept + - description + output_key: categories # This is optional, and defaults to "clusters" + summary_schema: + concept: str + description: str + summary_prompt: | + The following describes two related concepts. What concept + encompasses both? Try not to be too broad; it might be that one of + these two concepts already encompasses the other; in that case, + you should just use that concept. + + {{left.concept}}: + {{left.description}} + + {{right.concept}}: + {{right.description}} + + Provide the title of the super-concept, and a description. +``` + +This cluster operation processes a set of concepts, each with a title +and a description, and groups them into a tree of categories. + +??? example "Sample Input and Output" + + Input: + ```json + [ + { + "concept": "Shed", + "description": "A shed is typically a simple, single-story roofed structure, often used for storage, for hobbies, or as a workshop, and typically serving as outbuilding, such as in a back garden or on an allotment. Sheds vary considerably in their size and complexity of construction, from simple open-sided ones designed to cover bicycles or garden items to large wood-framed structures with shingled roofs, windows, and electrical outlets. Sheds used on farms or in the industry can be large structures. The main types of shed construction are metal sheathing over a metal frame, plastic sheathing and frame, all-wood construction (the roof may be asphalt shingled or sheathed in tin), and vinyl-sided sheds built over a wooden frame. Small sheds may include a wooden or plastic floor, while more permanent ones may be built on a concrete pad or foundation. Sheds may be lockable to deter theft or entry by children, domestic animals, wildlife, etc." + }, + { + "concept": "Barn", + "description": "A barn is an agricultural building usually on farms and used for various purposes. In North America, a barn refers to structures that house livestock, including cattle and horses, as well as equipment and fodder, and often grain.[2] As a result, the term barn is often qualified e.g. tobacco barn, dairy barn, cow house, sheep barn, potato barn. In the British Isles, the term barn is restricted mainly to storage structures for unthreshed cereals and fodder, the terms byre or shippon being applied to cow shelters, whereas horses are kept in buildings known as stables.[2][3] In mainland Europe, however, barns were often part of integrated structures known as byre-dwellings (or housebarns in US literature). In addition, barns may be used for equipment storage, as a covered workplace, and for activities such as threshing." + }, + { + "concept": "Tree house", + "description": "A tree house, tree fort or treeshed, is a platform or building constructed around, next to or among the trunk or branches of one or more mature trees while above ground level. Tree houses can be used for recreation, work space, habitation, a hangout space and observation. People occasionally connect ladders or staircases to get up to the platforms." + }, + { + "concept": "Castle", + "description": "A castle is a type of fortified structure built during the Middle Ages predominantly by the nobility or royalty and by military orders. Scholars usually consider a castle to be the private fortified residence of a lord or noble. This is distinct from a mansion, palace, and villa, whose main purpose was exclusively for pleasance and are not primarily fortresses but may be fortified.[a] Use of the term has varied over time and, sometimes, has also been applied to structures such as hill forts and 19th- and 20th-century homes built to resemble castles. Over the Middle Ages, when genuine castles were built, they took on a great many forms with many different features, although some, such as curtain walls, arrowslits, and portcullises, were commonplace." + }, + { + "concept": "Fortress", + "description": "A fortification (also called a fort, fortress, fastness, or stronghold) is a military construction designed for the defense of territories in warfare, and is used to establish rule in a region during peacetime. The term is derived from Latin fortis ('strong') and facere ('to make'). From very early history to modern times, defensive walls have often been necessary for cities to survive in an ever-changing world of invasion and conquest. Some settlements in the Indus Valley Civilization were the first small cities to be fortified. In ancient Greece, large stone walls had been built in Mycenaean Greece, such as the ancient site of Mycenae (known for the huge stone blocks of its 'cyclopean' walls). A Greek phrourion was a fortified collection of buildings used as a military garrison, and is the equivalent of the Roman castellum or fortress. These constructions mainly served the purpose of a watch tower, to guard certain roads, passes, and borders. Though smaller than a real fortress, they acted as a border guard rather than a real strongpoint to watch and maintain the border." + } + ] + ``` + + Output: + ```json + [ + { + "concept": "Shed", + "description": "A shed is typically a simple, single-story roofed structure, often used for storage, for hobbies, or as a workshop, and typically serving as outbuilding, such as in a back garden or on an allotment. Sheds vary considerably in their size and complexity of construction, from simple open-sided ones designed to cover bicycles or garden items to large wood-framed structures with shingled roofs, windows, and electrical outlets. Sheds used on farms or in the industry can be large structures. The main types of shed construction are metal sheathing over a metal frame, plastic sheathing and frame, all-wood construction (the roof may be asphalt shingled or sheathed in tin), and vinyl-sided sheds built over a wooden frame. Small sheds may include a wooden or plastic floor, while more permanent ones may be built on a concrete pad or foundation. Sheds may be lockable to deter theft or entry by children, domestic animals, wildlife, etc.", + "categories": [ + { + "distance": 0.9907871670904073, + "concept": "Outbuildings", + "description": "Outbuildings are structures that are separate from a main building, typically located on a property for purposes such as storage, workshops, or housing animals and equipment. This category includes structures like sheds and barns, which serve specific functions like storing tools, equipment, or livestock." + }, + { + "distance": 1.148880974178631, + "concept": "Auxiliary Structures", + "description": "Auxiliary structures are secondary or additional buildings that serve various practical purposes related to a main dwelling or property. This category encompasses structures like tree houses and outbuildings, which provide functional, recreational, or storage spaces, often designed to enhance the usability of the property." + }, + { + "distance": 1.292957924480073, + "concept": "Military and Support Structures", + "description": "Military and support structures refer to various types of constructions designed for specific functions related to defense and utility. This concept encompasses fortified structures, such as castles and fortresses, built for protection and military purposes, as well as auxiliary structures that serve practical roles for main buildings, including storage, recreation, and additional facilities. Together, these structures enhance the safety, functionality, and usability of a property or territory." + } + ] + }, + { + "concept": "Barn", + "description": "A barn is an agricultural building usually on farms and used for various purposes. In North America, a barn refers to structures that house livestock, including cattle and horses, as well as equipment and fodder, and often grain.[2] As a result, the term barn is often qualified e.g. tobacco barn, dairy barn, cow house, sheep barn, potato barn. In the British Isles, the term barn is restricted mainly to storage structures for unthreshed cereals and fodder, the terms byre or shippon being applied to cow shelters, whereas horses are kept in buildings known as stables.[2][3] In mainland Europe, however, barns were often part of integrated structures known as byre-dwellings (or housebarns in US literature). In addition, barns may be used for equipment storage, as a covered workplace, and for activities such as threshing.", + "categories": [ + { + "distance": 0.9907871670904073, + "concept": "Outbuildings", + "description": "Outbuildings are structures that are separate from a main building, typically located on a property for purposes such as storage, workshops, or housing animals and equipment. This category includes structures like sheds and barns, which serve specific functions like storing tools, equipment, or livestock." + }, + { + "distance": 1.148880974178631, + "concept": "Auxiliary Structures", + "description": "Auxiliary structures are secondary or additional buildings that serve various practical purposes related to a main dwelling or property. This category encompasses structures like tree houses and outbuildings, which provide functional, recreational, or storage spaces, often designed to enhance the usability of the property." + }, + { + "distance": 1.292957924480073, + "concept": "Military and Support Structures", + "description": "Military and support structures refer to various types of constructions designed for specific functions related to defense and utility. This concept encompasses fortified structures, such as castles and fortresses, built for protection and military purposes, as well as auxiliary structures that serve practical roles for main buildings, including storage, recreation, and additional facilities. Together, these structures enhance the safety, functionality, and usability of a property or territory." + } + ] + }, + { + "concept": "Tree house", + "description": "A tree house, tree fort or treeshed, is a platform or building constructed around, next to or among the trunk or branches of one or more mature trees while above ground level. Tree houses can be used for recreation, work space, habitation, a hangout space and observation. People occasionally connect ladders or staircases to get up to the platforms.", + "categories": [ + { + "distance": 1.148880974178631, + "concept": "Auxiliary Structures", + "description": "Auxiliary structures are secondary or additional buildings that serve various practical purposes related to a main dwelling or property. This category encompasses structures like tree houses and outbuildings, which provide functional, recreational, or storage spaces, often designed to enhance the usability of the property." + }, + { + "distance": 1.292957924480073, + "concept": "Military and Support Structures", + "description": "Military and support structures refer to various types of constructions designed for specific functions related to defense and utility. This concept encompasses fortified structures, such as castles and fortresses, built for protection and military purposes, as well as auxiliary structures that serve practical roles for main buildings, including storage, recreation, and additional facilities. Together, these structures enhance the safety, functionality, and usability of a property or territory." + } + ] + }, + { + "concept": "Castle", + "description": "A castle is a type of fortified structure built during the Middle Ages predominantly by the nobility or royalty and by military orders. Scholars usually consider a castle to be the private fortified residence of a lord or noble. This is distinct from a mansion, palace, and villa, whose main purpose was exclusively for pleasance and are not primarily fortresses but may be fortified.[a] Use of the term has varied over time and, sometimes, has also been applied to structures such as hill forts and 19th- and 20th-century homes built to resemble castles. Over the Middle Ages, when genuine castles were built, they took on a great many forms with many different features, although some, such as curtain walls, arrowslits, and portcullises, were commonplace.", + "categories": [ + { + "distance": 0.9152435235428339, + "concept": "Fortified structures", + "description": "Fortified structures refer to buildings designed to protect from attacks and enhance defense. This category encompasses various forms of military architecture, including castles and fortresses. Castles serve as private residences for nobility or military orders with substantial fortification features, while fortresses are broader military constructions aimed at defending territories and establishing control. Both types share the common purpose of defense against invasion, though they serve different social and functional roles." + }, + { + "distance": 1.292957924480073, + "concept": "Military and Support Structures", + "description": "Military and support structures refer to various types of constructions designed for specific functions related to defense and utility. This concept encompasses fortified structures, such as castles and fortresses, built for protection and military purposes, as well as auxiliary structures that serve practical roles for main buildings, including storage, recreation, and additional facilities. Together, these structures enhance the safety, functionality, and usability of a property or territory." + } + ] + }, + { + "concept": "Fortress", + "description": "A fortification (also called a fort, fortress, fastness, or stronghold) is a military construction designed for the defense of territories in warfare, and is used to establish rule in a region during peacetime. The term is derived from Latin fortis ('strong') and facere ('to make'). From very early history to modern times, defensive walls have often been necessary for cities to survive in an ever-changing world of invasion and conquest. Some settlements in the Indus Valley Civilization were the first small cities to be fortified. In ancient Greece, large stone walls had been built in Mycenaean Greece, such as the ancient site of Mycenae (known for the huge stone blocks of its 'cyclopean' walls). A Greek phrourion was a fortified collection of buildings used as a military garrison, and is the equivalent of the Roman castellum or fortress. These constructions mainly served the purpose of a watch tower, to guard certain roads, passes, and borders. Though smaller than a real fortress, they acted as a border guard rather than a real strongpoint to watch and maintain the border.", + "categories": [ + { + "distance": 0.9152435235428339, + "concept": "Fortified structures", + "description": "Fortified structures refer to buildings designed to protect from attacks and enhance defense. This category encompasses various forms of military architecture, including castles and fortresses. Castles serve as private residences for nobility or military orders with substantial fortification features, while fortresses are broader military constructions aimed at defending territories and establishing control. Both types share the common purpose of defense against invasion, though they serve different social and functional roles." + }, + { + "distance": 1.292957924480073, + "concept": "Military and Support Structures", + "description": "Military and support structures refer to various types of constructions designed for specific functions related to defense and utility. This concept encompasses fortified structures, such as castles and fortresses, built for protection and military purposes, as well as auxiliary structures that serve practical roles for main buildings, including storage, recreation, and additional facilities. Together, these structures enhance the safety, functionality, and usability of a property or territory." + } + ] + } + ] + ``` + +## Required Parameters + +- `name`: A unique name for the operation. +- `type`: Must be set to "cluster". +- `embedding_keys`: A list of keys to use for the embedding that is clustered on +- `summary_prompt`: The prompt used to summarize a cluster based on its children. Access input variables with `left.keyname` or `right.keyname`. +- `summary_schema`: The schema for the summary of each cluster. This is the output schema for the `summary_prompt` based llm call. + +## Optional Parameters + +| Parameter | Description | Default | +| ------------------------- | -------------------------------------------------------------------------------- | ----------------------------- | +| `output_key` | The name of the output key where the cluster path will be inserted in the items. | "clusters" | +| `model` | The language model to use | Falls back to `default_model` | +| `embedding_model` | The embedding model to use | "text-embedding-3-small" | +| `tools` | List of tool definitions for LLM use | None | +| `timeout` | Timeout for each LLM call in seconds | 120 | +| `max_retries_per_timeout` | Maximum number of retries per timeout | 2 | +| `validate` | List of Python expressions to validate the output | None | +| `sample` | Number of items to sample for this operation | None | diff --git a/mkdocs.yml b/mkdocs.yml index dac2122a..e995670e 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -30,6 +30,7 @@ nav: - Parallel Map: operators/parallel-map.md - Filter: operators/filter.md - Equijoin: operators/equijoin.md + - Cluster: operators/cluster.md - Auxiliary Operators: - Split: operators/split.md - Gather: operators/gather.md diff --git a/pyproject.toml b/pyproject.toml index 274d4650..e0cf9bb2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -87,6 +87,7 @@ split = "docetl.operations.split:SplitOperation" reduce = "docetl.operations.reduce:ReduceOperation" resolve = "docetl.operations.resolve:ResolveOperation" gather = "docetl.operations.gather:GatherOperation" +cluster = "docetl.operations.cluster:ClusterOperation" [tool.poetry.plugins."docetl.parser"] llama_index_simple_directory_reader = "docetl.parsing_tools:llama_index_simple_directory_reader" diff --git a/tests/basic/test_cluster.py b/tests/basic/test_cluster.py new file mode 100644 index 00000000..3db8424b --- /dev/null +++ b/tests/basic/test_cluster.py @@ -0,0 +1,117 @@ +import pytest +from docetl.operations.cluster import ClusterOperation +from tests.conftest import api_wrapper, default_model, max_threads + + +@pytest.fixture +def cluster_config(): + return { + "name": "test_cluster", + "type": "cluster", + "embedding_keys": ["concept", "description"], + "output_key": "categories", + "summary_schema": {"concept": "string", "description": "string"}, + "summary_prompt": """ + The following describes two related concepts. What concept + encompasses both? Try not to be too broad; it might be that one of + these two concepts already encompasses the other; in that case, + you should just use that concept. + + {{left.concept}}: + {{left.description}} + + {{right.concept}}: + {{right.description}} + + Provide the title of the super-concept, and a description. + """, + "model": "gpt-4o-mini", + } + + +@pytest.fixture +def sample_data(): + return [ + { + "concept": "Shed", + "description": "A simple, single-story roofed structure, often used for storage or as a workshop.", + }, + { + "concept": "Barn", + "description": "A large agricultural building used for storing farm products and sheltering livestock.", + }, + { + "concept": "Tree house", + "description": "A small house built among the branches of a tree for children to play in.", + }, + { + "concept": "Skyscraper", + "description": "A very tall building of many stories, typically found in urban areas.", + }, + { + "concept": "Castle", + "description": "A large fortified building or set of buildings from the medieval period.", + }, + { + "concept": "Igloo", + "description": "A dome-shaped dwelling made of blocks of solid snow, traditionally built by Inuit people.", + }, + { + "concept": "Lighthouse", + "description": "A tower with a bright light at the top, used to warn or guide ships at sea.", + }, + { + "concept": "Windmill", + "description": "A building with sails or vanes that turn in the wind and generate power to grind grain into flour.", + }, + ] + + +def test_cluster_operation( + cluster_config, sample_data, api_wrapper, default_model, max_threads +): + operation = ClusterOperation( + api_wrapper, cluster_config, default_model, max_threads + ) + results, cost = operation.execute(sample_data) + + assert len(results) == len(sample_data) + assert cost > 0 + + for result in results: + assert "categories" in result + assert isinstance(result["categories"], tuple) + assert len(result["categories"]) > 0 + + for category in result["categories"]: + assert "concept" in category + assert "description" in category + + +def test_cluster_operation_empty_input( + cluster_config, api_wrapper, default_model, max_threads +): + operation = ClusterOperation( + api_wrapper, cluster_config, default_model, max_threads + ) + results, cost = operation.execute([]) + + assert len(results) == 0 + assert cost == 0 + + +def test_cluster_operation_single_item( + cluster_config, api_wrapper, default_model, max_threads +): + single_item = [ + {"concept": "House", "description": "A building for human habitation."} + ] + operation = ClusterOperation( + api_wrapper, cluster_config, default_model, max_threads + ) + results, cost = operation.execute(single_item) + + assert len(results) == 1 + assert cost == 0 + assert "categories" in results[0] + assert isinstance(results[0]["categories"], tuple)