diff --git a/wren-ai-service/config.example.yaml b/wren-ai-service/config.example.yaml index 64c41add3..86c44ad93 100644 --- a/wren-ai-service/config.example.yaml +++ b/wren-ai-service/config.example.yaml @@ -103,3 +103,5 @@ pipes: - name: sql_regeneration llm: openai_llm.gpt-4o-mini engine: wren_ui + - name: semantics_description + llm: openai_llm.gpt-4o-mini diff --git a/wren-ai-service/src/globals.py b/wren-ai-service/src/globals.py index ea337e746..d03b5e2ef 100644 --- a/wren-ai-service/src/globals.py +++ b/wren-ai-service/src/globals.py @@ -8,6 +8,7 @@ from src.core.provider import EmbedderProvider, LLMProvider from src.pipelines.generation import ( followup_sql_generation, + semantics_description, sql_answer, sql_breakdown, sql_correction, @@ -21,6 +22,7 @@ from src.pipelines.retrieval import historical_question, retrieval from src.web.v1.services.ask import AskService from src.web.v1.services.ask_details import AskDetailsService +from src.web.v1.services.semantics_description import SemanticsDescription from src.web.v1.services.semantics_preparation import SemanticsPreparationService from src.web.v1.services.sql_answer import SqlAnswerService from src.web.v1.services.sql_expansion import SqlExpansionService @@ -32,6 +34,7 @@ @dataclass class ServiceContainer: + semantics_description: SemanticsDescription semantics_preparation_service: SemanticsPreparationService ask_service: AskService sql_answer_service: SqlAnswerService @@ -55,6 +58,14 @@ def create_service_container( query_cache: Optional[dict] = {}, ) -> ServiceContainer: return ServiceContainer( + semantics_description=SemanticsDescription( + pipelines={ + "semantics_description": semantics_description.SemanticsDescription( + **pipe_components["semantics_description"], + ) + }, + **query_cache, + ), semantics_preparation_service=SemanticsPreparationService( pipelines={ "indexing": indexing.Indexing( diff --git a/wren-ai-service/src/pipelines/generation/semantics_description.py b/wren-ai-service/src/pipelines/generation/semantics_description.py new file mode 100644 index 000000000..29c410fd8 --- /dev/null +++ b/wren-ai-service/src/pipelines/generation/semantics_description.py @@ -0,0 +1,236 @@ +import json +import logging +import sys +from pathlib import Path +from typing import Any + +import orjson +from hamilton import base +from hamilton.experimental.h_async import AsyncDriver +from haystack.components.builders.prompt_builder import PromptBuilder +from langfuse.decorators import observe + +from src.core.pipeline import BasicPipeline, async_validate +from src.core.provider import LLMProvider + +logger = logging.getLogger("wren-ai-service") + + +## Start of Pipeline +@observe(capture_input=False) +def picked_models(mdl: dict, selected_models: list[str]) -> list[dict]: + def extract(model: dict) -> dict: + return { + "name": model["name"], + "columns": model["columns"], + "properties": model["properties"], + } + + return [ + extract(model) for model in mdl["models"] if model["name"] in selected_models + ] + + +@observe(capture_input=False) +def prompt( + picked_models: list[dict], + user_prompt: str, + prompt_builder: PromptBuilder, +) -> dict: + logger.debug(f"User prompt: {user_prompt}") + logger.debug(f"Picked models: {picked_models}") + return prompt_builder.run(picked_models=picked_models, user_prompt=user_prompt) + + +@observe(as_type="generation", capture_input=False) +async def generate(prompt: dict, generator: Any) -> dict: + logger.debug(f"prompt: {orjson.dumps(prompt, option=orjson.OPT_INDENT_2).decode()}") + return await generator.run(prompt=prompt.get("prompt")) + + +@observe(capture_input=False) +def normalize(generate: dict) -> dict: + def wrapper(text: str) -> str: + text = text.replace("\n", " ") + text = " ".join(text.split()) + # Convert the normalized text to a dictionary + try: + text_dict = orjson.loads(text.strip()) + return text_dict + except orjson.JSONDecodeError as e: + logger.error(f"Error decoding JSON: {e}") + return {} # Return an empty dictionary if JSON decoding fails + + logger.debug( + f"generate: {orjson.dumps(generate, option=orjson.OPT_INDENT_2).decode()}" + ) + + reply = generate.get("replies")[0] # Expecting only one reply + normalized = wrapper(reply) + + return {model["name"]: model for model in normalized["models"]} + + +## End of Pipeline + +system_prompt = """ +I have a data model represented in JSON format, with the following structure: + +``` +[ + {'name': 'model', 'columns': [ + {'name': 'column_1', 'type': 'type', 'notNull': True, 'properties': {} + }, + {'name': 'column_2', 'type': 'type', 'notNull': True, 'properties': {} + }, + {'name': 'column_3', 'type': 'type', 'notNull': False, 'properties': {} + } + ], 'properties': {} + } +] +``` + +Your task is to update this JSON structure by adding a `description` field inside both the `properties` attribute of each `column` and the `model` itself. +Each `description` should be derived from a user-provided input that explains the purpose or context of the `model` and its respective columns. +Follow these steps: +1. **For the `model`**: Prompt the user to provide a brief description of the model's overall purpose or its context. Insert this description in the `properties` field of the `model`. +2. **For each `column`**: Ask the user to describe each column's role or significance. Each column's description should be added under its respective `properties` field in the format: `'description': 'user-provided text'`. +3. Ensure that the output is a well-formatted JSON structure, preserving the input's original format and adding the appropriate `description` fields. + +### Output Format: + +``` +{ + "models": [ + { + "name": "model", + "columns": [ + { + "name": "column_1", + "properties": { + "description": "" + } + }, + { + "name": "column_2", + "properties": { + "description": "" + } + }, + { + "name": "column_3", + "properties": { + "description": "" + } + } + ], + "properties": { + "description": "" + } + } + ] +} +``` + +Make sure that the descriptions are concise, informative, and contextually appropriate based on the input provided by the user. +""" + +user_prompt_template = """ +### Input: +User's prompt: {{ user_prompt }} +Picked models: {{ picked_models }} + +Please provide a brief description for the model and each column based on the user's prompt. +""" + + +class SemanticsDescription(BasicPipeline): + def __init__(self, llm_provider: LLMProvider, **_): + self._components = { + "prompt_builder": PromptBuilder(template=user_prompt_template), + "generator": llm_provider.get_generator(system_prompt=system_prompt), + } + self._final = "normalize" + + super().__init__( + AsyncDriver({}, sys.modules[__name__], result_builder=base.DictResult()) + ) + + def visualize( + self, + user_prompt: str, + selected_models: list[str], + mdl: dict, + ) -> None: + destination = "outputs/pipelines/generation" + if not Path(destination).exists(): + Path(destination).mkdir(parents=True, exist_ok=True) + + self._pipe.visualize_execution( + [self._final], + output_file_path=f"{destination}/semantics_description.dot", + inputs={ + "user_prompt": user_prompt, + "selected_models": selected_models, + "mdl": mdl, + **self._components, + }, + show_legend=True, + orient="LR", + ) + + @observe(name="Semantics Description Generation") + async def run( + self, + user_prompt: str, + selected_models: list[str], + mdl: dict, + ) -> dict: + logger.info("Semantics Description Generation pipeline is running...") + return await self._pipe.execute( + [self._final], + inputs={ + "user_prompt": user_prompt, + "selected_models": selected_models, + "mdl": mdl, + **self._components, + }, + ) + + +if __name__ == "__main__": + from langfuse.decorators import langfuse_context + + from src.core.engine import EngineConfig + from src.core.pipeline import async_validate + from src.providers import init_providers + from src.utils import init_langfuse, load_env_vars + + load_env_vars() + init_langfuse() + + llm_provider, _, _, _ = init_providers(EngineConfig()) + pipeline = SemanticsDescription(llm_provider=llm_provider) + + with open("sample/college_3_bigquery_mdl.json", "r") as file: + mdl = json.load(file) + + input = { + "user_prompt": "Track student enrollments, grades, and GPA calculations to monitor academic performance and identify areas for student support", + "selected_models": [ + "Student", + "Minor_in", + "Member_of", + "Gradeconversion", + "Faculty", + "Enrolled_in", + "Department", + "Course", + ], + "mdl": mdl, + } + + pipeline.visualize(**input) + async_validate(lambda: pipeline.run(**input)) + + langfuse_context.flush() diff --git a/wren-ai-service/src/web/v1/routers.py b/wren-ai-service/src/web/v1/routers/__init__.py similarity index 98% rename from wren-ai-service/src/web/v1/routers.py rename to wren-ai-service/src/web/v1/routers/__init__.py index 92a32846e..93f0d4094 100644 --- a/wren-ai-service/src/web/v1/routers.py +++ b/wren-ai-service/src/web/v1/routers/__init__.py @@ -9,6 +9,7 @@ get_service_container, get_service_metadata, ) +from src.web.v1.routers import semantics_description from src.web.v1.services.ask import ( AskRequest, AskResponse, @@ -57,6 +58,7 @@ ) router = APIRouter() +router.include_router(semantics_description.router) @router.post("/semantics-preparations") diff --git a/wren-ai-service/src/web/v1/routers/semantics_description.py b/wren-ai-service/src/web/v1/routers/semantics_description.py new file mode 100644 index 000000000..53adec9dd --- /dev/null +++ b/wren-ai-service/src/web/v1/routers/semantics_description.py @@ -0,0 +1,131 @@ +import uuid +from dataclasses import asdict +from typing import Literal, Optional + +from fastapi import APIRouter, BackgroundTasks, Depends +from pydantic import BaseModel + +from src.globals import ( + ServiceContainer, + ServiceMetadata, + get_service_container, + get_service_metadata, +) +from src.web.v1.services.semantics_description import SemanticsDescription + +router = APIRouter() + +""" +Semantics Description Router + +This router handles endpoints related to generating and retrieving semantic descriptions. + +Endpoints: +1. POST /semantics-descriptions + - Generates a new semantic description + - Request body: PostRequest + { + "selected_models": ["model1", "model2"], # List of model names to describe + "user_prompt": "Describe these models", # User's instruction for description + "mdl": "{ ... }" # JSON string of the MDL (Model Definition Language) + } + - Response: PostResponse + { + "id": "unique-uuid" # Unique identifier for the generated description + } + +2. GET /semantics-descriptions/{id} + - Retrieves the status and result of a semantic description generation + - Path parameter: id (str) + - Response: GetResponse + { + "id": "unique-uuid", # Unique identifier of the description + "status": "generating" | "finished" | "failed", + "response": { # Present only if status is "finished" + "model1": { + "columns": [...], + "properties": {...} + }, + "model2": { + "columns": [...], + "properties": {...} + } + }, + "error": { # Present only if status is "failed" + "code": "OTHERS", + "message": "Error description" + } + } + +The semantic description generation is an asynchronous process. The POST endpoint +initiates the generation and returns immediately with an ID. The GET endpoint can +then be used to check the status and retrieve the result when it's ready. + +Usage: +1. Send a POST request to start the generation process. +2. Use the returned ID to poll the GET endpoint until the status is "finished" or "failed". + +Note: The actual generation is performed in the background using FastAPI's BackgroundTasks. +""" + + +class PostRequest(BaseModel): + selected_models: list[str] + user_prompt: str + mdl: str + + +class PostResponse(BaseModel): + id: str + + +@router.post( + "/semantics-descriptions", + response_model=PostResponse, +) +async def generate( + request: PostRequest, + background_tasks: BackgroundTasks, + service_container: ServiceContainer = Depends(get_service_container), + service_metadata: ServiceMetadata = Depends(get_service_metadata), +) -> PostResponse: + id = str(uuid.uuid4()) + service = service_container.semantics_description + + service[id] = SemanticsDescription.Resource(id=id) + input = SemanticsDescription.Input( + id=id, + selected_models=request.selected_models, + user_prompt=request.user_prompt, + mdl=request.mdl, + ) + + background_tasks.add_task( + service.generate, input, service_metadata=asdict(service_metadata) + ) + return PostResponse(id=id) + + +class GetResponse(BaseModel): + id: str + status: Literal["generating", "finished", "failed"] + response: Optional[dict] + error: Optional[dict] + + +@router.get( + "/semantics-descriptions/{id}", + response_model=GetResponse, +) +async def get( + id: str, + service_container: ServiceContainer = Depends(get_service_container), +) -> GetResponse: + resource = service_container.semantics_description[id] + + return GetResponse( + id=resource.id, + status=resource.status, + response=resource.response, + error=resource.error and resource.error.model_dump(), + ) diff --git a/wren-ai-service/src/web/v1/services/semantics_description.py b/wren-ai-service/src/web/v1/services/semantics_description.py new file mode 100644 index 000000000..abc8311b6 --- /dev/null +++ b/wren-ai-service/src/web/v1/services/semantics_description.py @@ -0,0 +1,95 @@ +import logging +from typing import Dict, Literal, Optional + +import orjson +from cachetools import TTLCache +from langfuse.decorators import observe +from pydantic import BaseModel + +from src.core.pipeline import BasicPipeline +from src.utils import trace_metadata + +logger = logging.getLogger("wren-ai-service") + + +class SemanticsDescription: + class Input(BaseModel): + id: str + selected_models: list[str] + user_prompt: str + mdl: str + + class Resource(BaseModel): + class Error(BaseModel): + code: Literal["OTHERS"] + message: str + + id: str + status: Literal["generating", "finished", "failed"] = None + response: Optional[dict] = None + error: Optional[Error] = None + + def __init__( + self, + pipelines: Dict[str, BasicPipeline], + maxsize: int = 1_000_000, + ttl: int = 120, + ): + self._pipelines = pipelines + self._cache: Dict[str, SemanticsDescription.Resource] = TTLCache( + maxsize=maxsize, ttl=ttl + ) + + def _handle_exception(self, request: Input, error_message: str): + self[request.id] = self.Resource( + id=request.id, + status="failed", + error=self.Resource.Error(code="OTHERS", message=error_message), + ) + logger.error(error_message) + + @observe(name="Generate Semantics Description") + @trace_metadata + async def generate(self, request: Input, **kwargs) -> Resource: + logger.info("Generate Semantics Description pipeline is running...") + + try: + mdl_dict = orjson.loads(request.mdl) + + input = { + "user_prompt": request.user_prompt, + "selected_models": request.selected_models, + "mdl": mdl_dict, + } + + resp = await self._pipelines["semantics_description"].run(**input) + + self[request.id] = self.Resource( + id=request.id, status="finished", response=resp.get("normalize") + ) + except orjson.JSONDecodeError as e: + self._handle_exception(request, f"Failed to parse MDL: {str(e)}") + except Exception as e: + self._handle_exception( + request, + f"An error occurred during semantics description generation: {str(e)}", + ) + + return self[request.id] + + def __getitem__(self, id: str) -> Resource: + response = self._cache.get(id) + + if response is None: + message = f"Semantics Description Resource with ID '{id}' not found." + logger.exception(message) + return self.Resource( + id=id, + status="failed", + error=self.Resource.Error(code="OTHERS", message=message), + ) + + return response + + def __setitem__(self, id: str, value: Resource): + self._cache[id] = value diff --git a/wren-ai-service/tests/pytest/services/test_semantics_description.py b/wren-ai-service/tests/pytest/services/test_semantics_description.py new file mode 100644 index 000000000..f35f382d7 --- /dev/null +++ b/wren-ai-service/tests/pytest/services/test_semantics_description.py @@ -0,0 +1,123 @@ +from unittest.mock import AsyncMock + +import pytest + +from src.web.v1.services.semantics_description import SemanticsDescription + + +@pytest.fixture +def semantics_description_service(): + mock_pipeline = AsyncMock() + mock_pipeline.run.return_value = { + "normalize": { + "model1": { + "columns": [], + "properties": {"description": "Test description"}, + } + } + } + + pipelines = {"semantics_description": mock_pipeline} + return SemanticsDescription(pipelines=pipelines) + + +@pytest.mark.asyncio +async def test_generate_semantics_description( + semantics_description_service: SemanticsDescription, +): + request = SemanticsDescription.Input( + id="test_id", + user_prompt="Describe the model", + selected_models=["model1"], + mdl='{"models": [{"name": "model1", "columns": []}]}', + ) + + response = await semantics_description_service.generate(request) + + assert response.id == "test_id" + assert response.status == "finished" + assert response.response == { + "model1": { + "columns": [], + "properties": {"description": "Test description"}, + } + } + assert response.error is None + + +@pytest.mark.asyncio +async def test_generate_semantics_description_with_invalid_mdl( + semantics_description_service: SemanticsDescription, +): + request = SemanticsDescription.Input( + id="test_id", + user_prompt="Describe the model", + selected_models=["model1"], + mdl="invalid_json", + ) + + response = await semantics_description_service.generate(request) + + assert response.id == "test_id" + assert response.status == "failed" + assert response.response is None + assert response.error.code == "OTHERS" + assert "Failed to parse MDL" in response.error.message + + +@pytest.mark.asyncio +async def test_generate_semantics_description_with_exception( + semantics_description_service: SemanticsDescription, +): + request = SemanticsDescription.Input( + id="test_id", + user_prompt="Describe the model", + selected_models=["model1"], + mdl='{"models": [{"name": "model1", "columns": []}]}', + ) + + semantics_description_service._pipelines[ + "semantics_description" + ].run.side_effect = Exception("Test exception") + + response = await semantics_description_service.generate(request) + + assert response.id == "test_id" + assert response.status == "failed" + assert response.response is None + assert response.error.code == "OTHERS" + assert ( + "An error occurred during semantics description generation" + in response.error.message + ) + + +def test_get_semantics_description_result( + semantics_description_service: SemanticsDescription, +): + id = "test_id" + + expected_response = SemanticsDescription.Resource( + id=id, + status="finished", + response={"model1": {"description": "Test description"}}, + ) + semantics_description_service._cache[id] = expected_response + + result = semantics_description_service[id] + + assert result == expected_response + + +def test_get_non_existent_semantics_description_result( + semantics_description_service: SemanticsDescription, +): + id = "non_existent_id" + + result = semantics_description_service[id] + + assert result.id == "non_existent_id" + assert result.status == "failed" + assert result.response is None + assert result.error.code == "OTHERS" + assert "not found" in result.error.message