Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: log pipeline status and pass it through pipeline [COG-1214] #501

Merged
merged 25 commits into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
6d9fd17
feat: log pipeline runs
borisarzentar Feb 4, 2025
c5c61dc
Merge branch 'dev' into fear/metrics
alekszievr Feb 5, 2025
01061ae
logging pipeline complete and error
alekszievr Feb 5, 2025
e7bf4c5
pipeline run logging
alekszievr Feb 5, 2025
3f27649
remove legacy pipeline logging
alekszievr Feb 5, 2025
c9746ac
handle errors and use correct sql query
alekszievr Feb 5, 2025
56eb9df
filter for all pipeline ids
alekszievr Feb 5, 2025
3fe998d
Merge branch 'dev' into fear/metrics
alekszievr Feb 5, 2025
3d121d5
adjust tests
alekszievr Feb 5, 2025
cf10922
Merge branch 'dev' into fear/metrics
alekszievr Feb 10, 2025
3218e5e
adjust tests to new run_tasks signature
alekszievr Feb 6, 2025
4a1ef00
get pipeline status in a db engine compatible way
alekszievr Feb 10, 2025
1e40d03
adjust code graph pipeline to new run_tasks signature
alekszievr Feb 10, 2025
6f6e4bb
Merge branch 'dev' into fear/metrics
alekszievr Feb 10, 2025
f35a980
Merge branch 'dev' into fear/metrics
alekszievr Feb 10, 2025
8c7d12a
Merge branch 'dev' into fear/metrics
alekszievr Feb 10, 2025
8ea140b
adjust integration tests
alekszievr Feb 10, 2025
1bd82f2
Merge branch 'dev' into fear/metrics
alekszievr Feb 10, 2025
1fe9c0c
Use run_tasks_base in integration tests
alekszievr Feb 10, 2025
329ac20
More general type for data in pipeline run
alekszievr Feb 10, 2025
00baa81
Dataset id in add
alekszievr Feb 10, 2025
f9d38b8
adjust notebooks
alekszievr Feb 10, 2025
1cb5e6d
Merge branch 'dev' into fear/metrics
alekszievr Feb 10, 2025
5961ade
string conversion of data
alekszievr Feb 11, 2025
0b63bf9
index dataset id, search by dataset id, generate pipeline id from pip…
alekszievr Feb 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ repos:
- id: check-added-large-files
- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: v0.9.0
rev: v0.9.5
hooks:
# Run the linter.
- id: ruff
Expand Down
4 changes: 3 additions & 1 deletion cognee/api/v1/add/add_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from cognee.infrastructure.databases.vector.pgvector import (
create_db_and_tables as create_pgvector_db_and_tables,
)
from uuid import uuid5, NAMESPACE_OID


async def add(
Expand Down Expand Up @@ -37,7 +38,8 @@ async def add(

tasks = [Task(resolve_data_directories), Task(ingest_data, dataset_name, user)]

pipeline = run_tasks(tasks, data, "add_pipeline")
dataset_id = uuid5(NAMESPACE_OID, dataset_name)
pipeline = run_tasks(tasks=tasks, dataset_id=dataset_id, data=data, pipeline_id="add_pipeline")

async for result in pipeline:
print(result)
18 changes: 14 additions & 4 deletions cognee/api/v1/cognify/code_graph_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,19 @@ async def run_code_graph_pipeline(repo_path, include_docs=True):
),
]

pipeline_run_status = None
if include_docs:
async for result in run_tasks(non_code_tasks, repo_path):
yield result
non_code_pipeline_run = run_tasks(non_code_tasks, None, repo_path, "cognify_pipeline")
async for run_status in non_code_pipeline_run:
pipeline_run_status = run_status

async for result in run_tasks(tasks, repo_path, "cognify_code_pipeline"):
yield result
from cognee.modules.data.methods import get_datasets

existing_datasets = await get_datasets(user.id)
code_pipeline_run = run_tasks(
tasks, existing_datasets[0].id, repo_path, "cognify_code_pipeline"
)
async for run_status in code_pipeline_run:
pipeline_run_status = run_status

alekszievr marked this conversation as resolved.
Show resolved Hide resolved
return pipeline_run_status
41 changes: 7 additions & 34 deletions cognee/api/v1/cognify/cognify_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from cognee.modules.pipelines import run_tasks
from cognee.modules.pipelines.models import PipelineRunStatus
from cognee.modules.pipelines.operations.get_pipeline_status import get_pipeline_status
from cognee.modules.pipelines.operations.log_pipeline_status import log_pipeline_status
from cognee.modules.pipelines.tasks.Task import Task
from cognee.modules.users.methods import get_default_user
from cognee.modules.users.models import User
Expand Down Expand Up @@ -73,8 +72,6 @@ async def cognify(
async def run_cognify_pipeline(dataset: Dataset, user: User, tasks: list[Task]):
data_documents: list[Data] = await get_dataset_data(dataset_id=dataset.id)

document_ids_str = [str(document.id) for document in data_documents]

dataset_id = dataset.id
dataset_name = generate_dataset_name(dataset.name)

Expand All @@ -84,21 +81,12 @@ async def run_cognify_pipeline(dataset: Dataset, user: User, tasks: list[Task]):
task_status = await get_pipeline_status([dataset_id])

if (
dataset_id in task_status
and task_status[dataset_id] == PipelineRunStatus.DATASET_PROCESSING_STARTED
str(dataset_id) in task_status
and task_status[str(dataset_id)] == PipelineRunStatus.DATASET_PROCESSING_STARTED
):
logger.info("Dataset %s is already being processed.", dataset_name)
return

await log_pipeline_status(
dataset_id,
PipelineRunStatus.DATASET_PROCESSING_STARTED,
{
"dataset_name": dataset_name,
"files": document_ids_str,
},
)

try:
if not isinstance(tasks, list):
raise ValueError("Tasks must be a list")
Expand All @@ -107,34 +95,19 @@ async def run_cognify_pipeline(dataset: Dataset, user: User, tasks: list[Task]):
if not isinstance(task, Task):
raise ValueError(f"Task {task} is not an instance of Task")

pipeline = run_tasks(tasks, data_documents, "cognify_pipeline")
pipeline_run = run_tasks(tasks, dataset.id, data_documents, "cognify_pipeline")
pipeline_run_status = None
alekszievr marked this conversation as resolved.
Show resolved Hide resolved

async for result in pipeline:
print(result)
async for run_status in pipeline_run:
pipeline_run_status = run_status

await index_graph_edges()

send_telemetry("cognee.cognify EXECUTION COMPLETED", user.id)
return pipeline_run_status

await log_pipeline_status(
dataset_id,
PipelineRunStatus.DATASET_PROCESSING_COMPLETED,
{
"dataset_name": dataset_name,
"files": document_ids_str,
},
)
except Exception as error:
send_telemetry("cognee.cognify EXECUTION ERRORED", user.id)

await log_pipeline_status(
dataset_id,
PipelineRunStatus.DATASET_PROCESSING_ERRORED,
{
"dataset_name": dataset_name,
"files": document_ids_str,
},
)
raise error


Expand Down
4 changes: 2 additions & 2 deletions cognee/modules/pipelines/models/PipelineRun.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import enum
from uuid import uuid4
from datetime import datetime, timezone
from sqlalchemy import Column, DateTime, JSON, Enum, UUID
from sqlalchemy import Column, DateTime, JSON, Enum, UUID, String
from cognee.infrastructure.databases.relational import Base


Expand All @@ -20,5 +20,5 @@ class PipelineRun(Base):

status = Column(Enum(PipelineRunStatus))

run_id = Column(UUID, index=True)
pipeline_id = Column(String, index=True)
run_info = Column(JSON)
3 changes: 3 additions & 0 deletions cognee/modules/pipelines/operations/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .logPipelineRunStart import logPipelineRunStart
from .logPipelineRunComplete import logPipelineRunComplete
from .logPipelineRunError import logPipelineRunError
23 changes: 9 additions & 14 deletions cognee/modules/pipelines/operations/get_pipeline_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,38 +7,33 @@

async def get_pipeline_status(pipeline_ids: list[UUID]):
db_engine = get_relational_engine()
dialect = db_engine.engine.dialect.name

async with db_engine.get_async_session() as session:
if dialect == "sqlite":
dataset_id_column = func.json_extract(PipelineRun.run_info, "$.dataset_id")
borisarzentar marked this conversation as resolved.
Show resolved Hide resolved
else:
dataset_id_column = PipelineRun.run_info.op("->>")("dataset_id")
alekszievr marked this conversation as resolved.
Show resolved Hide resolved

query = (
select(
PipelineRun,
func.row_number()
.over(
partition_by=PipelineRun.run_id,
partition_by=dataset_id_column,
order_by=PipelineRun.created_at.desc(),
)
.label("rn"),
)
.filter(PipelineRun.run_id.in_(pipeline_ids))
.filter(dataset_id_column.in_([str(id) for id in pipeline_ids]))
.subquery()
)

aliased_pipeline_run = aliased(PipelineRun, query)

latest_runs = select(aliased_pipeline_run).filter(query.c.rn == 1)

runs = (await session.execute(latest_runs)).scalars().all()

pipeline_statuses = {str(run.run_id): run.status for run in runs}
pipeline_statuses = {run.run_info["dataset_id"]: run.status for run in runs}

return pipeline_statuses

# f"""SELECT data_id, status
# FROM (
# SELECT data_id, status, ROW_NUMBER() OVER (PARTITION BY data_id ORDER BY created_at DESC) as rn
# FROM cognee.cognee.task_runs
# WHERE data_id IN ({formatted_data_ids})
# ) t
# WHERE rn = 1;"""

# return { dataset["data_id"]: dataset["status"] for dataset in datasets_statuses }
34 changes: 34 additions & 0 deletions cognee/modules/pipelines/operations/logPipelineRunComplete.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from uuid import UUID, uuid4
from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.modules.data.models import Data
from cognee.modules.pipelines.models import PipelineRun, PipelineRunStatus
from typing import Any


async def logPipelineRunComplete(pipeline_id: UUID, dataset_id: UUID, data: Any):
if not data:
data_info = "None"
elif isinstance(data, list) and all(isinstance(item, Data) for item in data):
data_info = [str(item.id) for item in data]
else:
data_info = data

pipeline_run_id = uuid4()

pipeline_run = PipelineRun(
id=pipeline_run_id,
borisarzentar marked this conversation as resolved.
Show resolved Hide resolved
pipeline_id=pipeline_id,
status=PipelineRunStatus.DATASET_PROCESSING_COMPLETED,
run_info={
"dataset_id": str(dataset_id),
"data": data_info,
},
)

db_engine = get_relational_engine()

async with db_engine.get_async_session() as session:
session.add(pipeline_run)
await session.commit()

return pipeline_run
35 changes: 35 additions & 0 deletions cognee/modules/pipelines/operations/logPipelineRunError.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from uuid import UUID, uuid4
from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.modules.data.models import Data
from cognee.modules.pipelines.models import PipelineRun, PipelineRunStatus
from typing import Any


async def logPipelineRunError(pipeline_id: str, dataset_id: UUID, data: Any, e: Exception):
if not data:
data_info = "None"
elif isinstance(data, list) and all(isinstance(item, Data) for item in data):
data_info = [str(item.id) for item in data]
else:
data_info = data

pipeline_run_id = uuid4()
borisarzentar marked this conversation as resolved.
Show resolved Hide resolved

pipeline_run = PipelineRun(
id=pipeline_run_id,
pipeline_id=pipeline_id,
status=PipelineRunStatus.DATASET_PROCESSING_ERRORED,
run_info={
"dataset_id": str(dataset_id),
"data": data_info,
"error": str(e),
},
)

db_engine = get_relational_engine()

async with db_engine.get_async_session() as session:
session.add(pipeline_run)
await session.commit()

return pipeline_run
34 changes: 34 additions & 0 deletions cognee/modules/pipelines/operations/logPipelineRunStart.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from uuid import UUID, uuid4
from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.modules.data.models import Data
from cognee.modules.pipelines.models import PipelineRun, PipelineRunStatus
from typing import Any


async def logPipelineRunStart(pipeline_id: str, dataset_id: UUID, data: Any):
if not data:
data_info = "None"
elif isinstance(data, list) and all(isinstance(item, Data) for item in data):
data_info = [str(item.id) for item in data]
else:
data_info = data

pipeline_run_id = uuid4()

pipeline_run = PipelineRun(
id=pipeline_run_id,
pipeline_id=pipeline_id,
status=PipelineRunStatus.DATASET_PROCESSING_STARTED,
run_info={
"dataset_id": str(dataset_id),
"data": data_info,
},
)

db_engine = get_relational_engine()

async with db_engine.get_async_session() as session:
session.add(pipeline_run)
await session.commit()

return pipeline_run
18 changes: 0 additions & 18 deletions cognee/modules/pipelines/operations/log_pipeline_status.py

This file was deleted.

26 changes: 22 additions & 4 deletions cognee/modules/pipelines/operations/run_tasks.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
import inspect
import json
import logging

from uuid import UUID

from typing import Any
from cognee.modules.pipelines.operations import (
logPipelineRunStart,
logPipelineRunComplete,
logPipelineRunError,
)
from cognee.modules.settings import get_current_settings
from cognee.modules.users.methods import get_default_user
from cognee.modules.users.models import User
Expand Down Expand Up @@ -261,6 +268,17 @@ async def run_tasks_with_telemetry(tasks: list[Task], data, pipeline_name: str):
raise error


async def run_tasks(tasks: list[Task], data=None, pipeline_name: str = "default_pipeline"):
async for result in run_tasks_with_telemetry(tasks, data, pipeline_name):
yield result
async def run_tasks(tasks: list[Task], dataset_id: UUID, data: Any, pipeline_id: str):
pipeline_run = await logPipelineRunStart(pipeline_id, dataset_id, data)

yield pipeline_run

try:
async for _ in run_tasks_with_telemetry(tasks, data, pipeline_id):
pass

yield await logPipelineRunComplete(pipeline_id, dataset_id, data)

except Exception as e:
yield await logPipelineRunError(pipeline_id, dataset_id, data, e)
raise e
13 changes: 10 additions & 3 deletions cognee/tests/integration/run_toy_tasks/run_task_from_queue_test.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import asyncio
from queue import Queue

from cognee.modules.pipelines.operations.run_tasks import run_tasks
from cognee.modules.pipelines.operations.run_tasks import run_tasks_base
from cognee.modules.pipelines.tasks.Task import Task
from cognee.modules.users.methods import get_default_user


async def pipeline(data_queue):
Expand All @@ -19,13 +20,15 @@ async def add_one(num):
async def multiply_by_two(num):
yield num * 2

tasks_run = run_tasks(
user = await get_default_user()
tasks_run = run_tasks_base(
[
Task(queue_consumer),
Task(add_one),
Task(multiply_by_two),
],
pipeline_name="test_run_tasks_from_queue",
data=None,
user=user,
)

results = [2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
Expand All @@ -50,3 +53,7 @@ async def queue_producer():

def test_run_tasks_from_queue():
asyncio.run(run_queue())


if __name__ == "__main__":
asyncio.run(run_queue())
Loading
Loading