Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 114 additions & 7 deletions src/typeagent/knowpro/knowledge.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
# Licensed under the MIT License.

import asyncio
from collections.abc import Callable
from dataclasses import dataclass

from typechat import Result, TypeChatLanguageModel

Expand Down Expand Up @@ -71,17 +73,122 @@ async def extract_knowledge_from_text_batch(
return [results[i] for i in range(len(text_batch))]


@dataclass
class _MergedEntity:
"""Internal helper for merging entities."""

name: str
types: set[str]
facets: dict[str, set[str]]


def merge_concrete_entities(
entities: list[kplib.ConcreteEntity],
normalize: Callable[[str], str] = str.lower,
) -> list[kplib.ConcreteEntity]:
"""Merge a list of concrete entities into a single list of merged entities."""
raise NotImplementedError("TODO")
# merged_entities = concrete_to_merged_entities(entities)
"""Merge a list of concrete entities by name, combining types and facets.

Entities with the same name (after normalization) are merged:
- Names, types, and facet names/values are normalized for matching
- Types are combined into a sorted unique list (normalized)
- Facets with the same name have their unique values concatenated with "; "

Args:
entities: List of entities to merge.
normalize: Function to normalize strings for matching. Defaults to
str.lower for case-insensitive matching. Pass str to preserve
original casing (fast identity function for strings).

Note:
By default, this function normalizes all text to lowercase, matching
the TypeScript implementation in knowledgeMerge.ts. Facet values are
converted to strings during merging. Complex types like Quantity and
Quantifier use their __str__ representation (e.g., "5 kg" or "many items").

Returns:
A list of merged entities sorted by name for deterministic ordering.
"""
if not entities:
return []

# merged_concrete_entities = []
# for merged_entity in merged_entities.values():
# merged_concrete_entities.append(merged_to_concrete_entity(merged_entity))
# return merged_concrete_entities
# Build a dict of merged entities keyed by normalized name
merged: dict[str, _MergedEntity] = {}

for entity in entities:
name_key = normalize(entity.name)
existing = merged.get(name_key)

if existing is None:
# First occurrence - create new merged entity
merged[name_key] = _MergedEntity(
name=name_key,
types=set(normalize(t) for t in entity.type),
facets=(
_facets_to_merged(entity.facets, normalize) if entity.facets else {}
),
)
else:
# Merge into existing
existing.types.update(normalize(t) for t in entity.type)
if entity.facets:
_merge_facets(existing.facets, entity.facets, normalize)

# Convert merged entities back to ConcreteEntity, sorted by name
result = []
for merged_entity in sorted(merged.values(), key=lambda e: e.name):
concrete = kplib.ConcreteEntity(
name=merged_entity.name,
type=sorted(merged_entity.types),
)
if merged_entity.facets:
concrete.facets = _merged_to_facets(merged_entity.facets)
result.append(concrete)

return result


def _add_facet_to_merged(
merged: dict[str, set[str]],
facet: kplib.Facet,
normalize: Callable[[str], str],
) -> None:
"""Add a single facet to a merged facets dict."""
name = normalize(facet.name)
value = normalize(str(facet.value)) if facet.value is not None else ""
merged.setdefault(name, set()).add(value)


def _facets_to_merged(
facets: list[kplib.Facet],
normalize: Callable[[str], str],
) -> dict[str, set[str]]:
"""Convert a list of Facets to a merged facets dict.

Facet names and values are normalized for merging.
"""
merged: dict[str, set[str]] = {}
for facet in facets:
_add_facet_to_merged(merged, facet, normalize)
return merged


def _merge_facets(
existing: dict[str, set[str]],
facets: list[kplib.Facet],
normalize: Callable[[str], str],
) -> None:
"""Merge facets into an existing facets dict."""
for facet in facets:
_add_facet_to_merged(existing, facet, normalize)


def _merged_to_facets(merged_facets: dict[str, set[str]]) -> list[kplib.Facet]:
"""Convert a merged facets dict back to a list of Facets."""
facets = []
for name, values in sorted(merged_facets.items()):
if values:
facets.append(kplib.Facet(name=name, value="; ".join(sorted(values))))
return facets


def merge_topics(topics: list[str]) -> list[str]:
Expand Down
Loading