Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: log pipeline status and pass it through pipeline [COG-1214] #501

Merged
merged 25 commits into from
Feb 11, 2025

Conversation

alekszievr
Copy link
Contributor

@alekszievr alekszievr commented Feb 5, 2025

Description

DCO Affirmation

I affirm that all code in every commit of this pull request conforms to the terms of the Topoteretes Developer Certificate of Origin

Summary by CodeRabbit

  • New Features

    • Enhanced pipeline execution now provides consolidated status feedback with improved telemetry for start, completion, and error events.
    • Automatic generation of unique dataset identifiers offers clearer task and pipeline run associations.
  • Refactor

    • Task execution has been streamlined with explicit parameter handling for more structured pipeline processing.
    • Interactive examples and demos now return results directly, making integration and monitoring more accessible.

Copy link
Contributor

coderabbitai bot commented Feb 5, 2025

Walkthrough

This pull request makes extensive updates across several modules to improve pipeline and task execution. Changes include updated function signatures with added UUID-based identifiers, enhanced asynchronous logging for pipeline start, completion, and error states, and modifications to control flow in both API endpoints and internal operations. Test files and notebook demos have been adjusted accordingly, and legacy logging code has been removed.

Changes

File(s) Change Summary
cognee/api/v1/add/add_v2.py, cognee/api/v1/cognify/cognify_v2.py, cognee/api/v1/cognify/code_graph_pipeline.py, examples/python/code_graph_example.py Updated API endpoints and examples: added UUID generation for dataset IDs, modified run_tasks calls to explicitly include dataset_id, and shifted control flow to return a consolidated pipeline run status.
cognee/modules/pipelines/models/PipelineRun.py Updated model: replaced run_id with pipeline_run_id and added pipeline_id and dataset_id columns; added necessary SQLAlchemy import.
cognee/modules/pipelines/operations/__init__.py, .../get_pipeline_status.py, .../run_tasks.py, .../log_pipeline_run_start.py, .../log_pipeline_run_complete.py, .../log_pipeline_run_error.py, (and removed log_pipeline_status.py) Overhauled pipeline operations: modified run_tasks signature to include dataset_id, integrated asynchronous logging functions for pipeline start, completion, and errors, and updated query filters to use dataset_id.
cognee/tests/integration/run_toy_tasks/run_task_from_queue_test.py, cognee/tests/integration/run_toy_tasks/run_tasks_test.py Updated tests: replaced import to use run_tasks_base and incorporated get_default_user, passing additional parameters (data, user).
notebooks/cognee_code_graph_demo.ipynb, notebooks/cognee_demo.ipynb Modified notebooks: updated task calls to include UUID generation and introduced pipeline_run_status for asynchronous status tracking.
.pre-commit-config.yaml Updated pre-commit hook: bumped Ruff revision from v0.9.0 to v0.9.5.

Sequence Diagram(s)

sequenceDiagram
    participant C as Caller
    participant RT as run_tasks
    participant LS as log_pipeline_run_start
    participant RWT as run_tasks_with_telemetry
    participant LC as log_pipeline_run_complete
    participant LE as log_pipeline_run_error

    C->>RT: Call run_tasks(tasks, dataset_id, data, pipeline_name)
    RT->>LS: log_pipeline_run_start(pipeline_id, dataset_id, data)
    LS-->>RT: Return start log result
    RT->>RWT: Execute run_tasks_with_telemetry(tasks, data)
    alt Execution Successful
        RT->>LC: log_pipeline_run_complete(pipeline_id, dataset_id, data)
        LC-->>RT: Return complete log result
    else Execution Fails
        RT->>LE: log_pipeline_run_error(pipeline_id, dataset_id, data, error)
        LE-->>RT: Return error log result
        RT-->>C: Raise exception
    end
    RT-->>C: Return final result
Loading

Possibly related PRs

Suggested reviewers

  • borisarzentar

Poem

I'm a rabbit in a code-filled maze,
Hopping through pipelines in endless arrays.
UUIDs and logs now lead the way,
As asynchronous tales brighten the day.
With each updated line, I skip with glee,
Celebrating changes for you and me! 🐰✨

✨ Finishing Touches
  • 📝 Generate Docstrings (Beta)

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🔭 Outside diff range comments (1)
cognee/modules/pipelines/operations/get_pipeline_status.py (1)

11-34: Add error handling for database operations.

The function needs error handling for database operations to handle potential failures gracefully.

Apply this diff to add error handling:

     db_engine = get_relational_engine()
 
-    async with db_engine.get_async_session() as session:
-        query = (
-            select(
-                PipelineRun,
-                func.row_number()
-                .over(
-                    partition_by=PipelineRun.id,
-                    order_by=PipelineRun.created_at.desc(),
-                )
-                .label("rn"),
-            )
-            .filter(PipelineRun.id.in_(pipeline_ids))
-            .subquery()
-        )
-
-        aliased_pipeline_run = aliased(PipelineRun, query)
-
-        latest_runs = select(aliased_pipeline_run).filter(query.c.rn == 1)
-
-        runs = (await session.execute(latest_runs)).scalars().all()
-
-        pipeline_statuses = {str(run.id): run.status for run in runs}
-
-        return pipeline_statuses
+    try:
+        async with db_engine.get_async_session() as session:
+            query = (
+                select(
+                    PipelineRun,
+                    func.row_number()
+                    .over(
+                        partition_by=PipelineRun.pipeline_id,
+                        order_by=PipelineRun.created_at.desc(),
+                    )
+                    .label("rn"),
+                )
+                .filter(PipelineRun.pipeline_id.in_(pipeline_ids))
+                .subquery()
+            )
+
+            aliased_pipeline_run = aliased(PipelineRun, query)
+
+            latest_runs = select(aliased_pipeline_run).filter(query.c.rn == 1)
+
+            runs = (await session.execute(latest_runs)).scalars().all()
+
+            pipeline_statuses = {str(run.pipeline_id): run.status for run in runs}
+
+            return pipeline_statuses
+    except Exception as e:
+        raise RuntimeError(f"Failed to get pipeline statuses: {str(e)}") from e
🧹 Nitpick comments (5)
cognee/modules/pipelines/operations/run_tasks.py (1)

271-282: Consider yielding the results from run_tasks_with_telemetry
Currently, the results of run_tasks_with_telemetry are discarded. If the caller may need downstream access to those results, consider yielding them:

- async for _ in run_tasks_with_telemetry(tasks, data, pipeline_id):
-     pass
+ async for result in run_tasks_with_telemetry(tasks, data, pipeline_id):
+     yield result
cognee/modules/pipelines/models/PipelineRun.py (1)

23-23: Consider a foreign key or uniqueness constraint for pipeline_id
If each pipeline ID is meant to map to a single pipeline, adding a foreign key relation (or uniqueness if applicable) might help maintain data consistency.

cognee/modules/pipelines/operations/logPipelineRunStart.py (1)

10-18: Rename loop variable to avoid overshadowing and clarify stored data
Inside the run_info dictionary, using data for both the parameter and loop variable can cause confusion. Rename the loop variable to something like datum. Also confirm that storing the entire dataset references is desired.

- "data": [str(data.id) for data in data] if isinstance(data, list) else data,
+ "data": [str(datum.id) for datum in data] if isinstance(data, list) else data,
cognee/api/v1/cognify/cognify_v2.py (2)

101-102: Add logging for pipeline run status changes.

Given that this PR aims to improve pipeline status logging, consider adding log statements to track status changes during execution.

Add logging to track status changes:

         async for run_status in pipeline_run:
+            logger.info("Pipeline status updated: %s", run_status)
             pipeline_run_status = run_status

108-109: Document the function's return type.

The function now returns a pipeline status, but this isn't reflected in the function signature. Consider adding type hints for better code documentation.

Update the function signature to include the return type:

-async def run_cognify_pipeline(dataset: Dataset, user: User, tasks: list[Task]):
+async def run_cognify_pipeline(dataset: Dataset, user: User, tasks: list[Task]) -> PipelineRunStatus:
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between df163b0 and 3f27649.

📒 Files selected for processing (10)
  • cognee/api/v1/add/add_v2.py (1 hunks)
  • cognee/api/v1/cognify/cognify_v2.py (1 hunks)
  • cognee/modules/pipelines/models/PipelineRun.py (2 hunks)
  • cognee/modules/pipelines/operations/__init__.py (1 hunks)
  • cognee/modules/pipelines/operations/get_pipeline_status.py (2 hunks)
  • cognee/modules/pipelines/operations/logPipelineRunComplete.py (1 hunks)
  • cognee/modules/pipelines/operations/logPipelineRunError.py (1 hunks)
  • cognee/modules/pipelines/operations/logPipelineRunStart.py (1 hunks)
  • cognee/modules/pipelines/operations/log_pipeline_status.py (0 hunks)
  • cognee/modules/pipelines/operations/run_tasks.py (2 hunks)
💤 Files with no reviewable changes (1)
  • cognee/modules/pipelines/operations/log_pipeline_status.py
✅ Files skipped from review due to trivial changes (1)
  • cognee/modules/pipelines/operations/init.py
⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: docker-compose-test
🔇 Additional comments (1)
cognee/modules/pipelines/operations/run_tasks.py (1)

4-11: Imports look good
These imports correctly reference the new logging functions and the UUID, Data types. No issues found.

cognee/modules/pipelines/operations/logPipelineRunError.py Outdated Show resolved Hide resolved
cognee/modules/pipelines/operations/get_pipeline_status.py Outdated Show resolved Hide resolved
cognee/api/v1/add/add_v2.py Outdated Show resolved Hide resolved
cognee/api/v1/cognify/cognify_v2.py Show resolved Hide resolved
@alekszievr alekszievr self-assigned this Feb 5, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3f27649 and c9746ac.

📒 Files selected for processing (5)
  • cognee/modules/pipelines/operations/get_pipeline_status.py (2 hunks)
  • cognee/modules/pipelines/operations/logPipelineRunComplete.py (1 hunks)
  • cognee/modules/pipelines/operations/logPipelineRunError.py (1 hunks)
  • cognee/modules/pipelines/operations/logPipelineRunStart.py (1 hunks)
  • cognee/modules/pipelines/operations/run_tasks.py (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
  • cognee/modules/pipelines/operations/logPipelineRunError.py
  • cognee/modules/pipelines/operations/logPipelineRunStart.py
  • cognee/modules/pipelines/operations/logPipelineRunComplete.py
🧰 Additional context used
🪛 GitHub Actions: test | neo4j
cognee/modules/pipelines/operations/run_tasks.py

[error] 272-272: Error in run_tasks function while awaiting pipeline run.

⏰ Context from checks skipped due to timeout of 90000ms (11)
  • GitHub Check: run_notebook_test / test
  • GitHub Check: run_notebook_test / test
  • GitHub Check: run_notebook_test / test
  • GitHub Check: run_notebook_test / test
  • GitHub Check: run_multimedia_example_test / test
  • GitHub Check: test
  • GitHub Check: test
  • GitHub Check: profiler
  • GitHub Check: windows-latest
  • GitHub Check: docker-compose-test
  • GitHub Check: test
🔇 Additional comments (4)
cognee/modules/pipelines/operations/get_pipeline_status.py (2)

17-17: LGTM! Partitioning by dataset_id is correct.

The change to partition by dataset_id from the run_info JSON field is appropriate for grouping related pipeline runs.


31-31: LGTM! Dictionary key update aligns with model changes.

The change from run.run_id to run.id correctly reflects the model changes mentioned in the AI summary.

cognee/modules/pipelines/operations/run_tasks.py (2)

4-11: LGTM! Imports are well-organized.

The new imports support the enhanced logging functionality and type safety with UUID and Data types.


271-284: ⚠️ Potential issue

Fix error handling and add type hints for pipeline run object.

The function has several issues:

  1. The pipeline failure suggests potential issues with logPipelineRunStart
  2. Missing type hints for the yielded pipeline run object
  3. The error handling could be more robust

Let's verify the implementation of logPipelineRunStart:

Apply these improvements:

-async def run_tasks(tasks: list[Task], dataset_id: UUID, data: list[Data], pipeline_id: str):
+async def run_tasks(tasks: list[Task], dataset_id: UUID, data: list[Data], pipeline_id: str) -> AsyncGenerator[PipelineRun, None]:
     try:
         pipeline_run = await logPipelineRunStart(pipeline_id, dataset_id, data)
+        if not pipeline_run:
+            raise ValueError("Failed to create pipeline run")
 
         yield pipeline_run
 
         try:
             async for _ in run_tasks_with_telemetry(tasks, data, pipeline_id):
                 pass
 
             yield await logPipelineRunComplete(pipeline_id, dataset_id, data)
 
         except Exception as e:
             yield await logPipelineRunError(pipeline_id, dataset_id, data, e)
             raise e
+    except Exception as e:
+        logger.error(f"Failed to start pipeline run: {str(e)}")
+        raise
✅ Verification successful

🏁 Script executed:

#!/bin/bash
# Search for logPipelineRunStart implementation
ast-grep --pattern 'async def logPipelineRunStart($_, $_, $_) {
  $$$
}'

Length of output: 74


🏁 Script executed:

#!/bin/bash
# Use ripgrep to search for the definition of logPipelineRunStart in the repository.
# This command looks for instances of the async function definition.
rg "async def logPipelineRunStart" .

Length of output: 184


Attention: Verification of Error Handling and Type Hints in run_tasks

The search confirmed that logPipelineRunStart is implemented in cognee/modules/pipelines/operations/logPipelineRunStart.py with the expected signature. Based on this, the recommended changes in the review comment—to add a return type hint (AsyncGenerator[PipelineRun, None]), include a check on the result of logPipelineRunStart (raising a ValueError if it returns a falsy value), and log errors more robustly—are appropriate and address the identified issues.

  • The modified signature now clearly communicates that the function yields a PipelineRun object.
  • The new check (if not pipeline_run: raise ValueError(...)) ensures that failures in starting a pipeline run are caught immediately.
  • Additional error logging before re-raising improves transparency into failures.

These improvements should enhance the code’s reliability and clarity.

🧰 Tools
🪛 GitHub Actions: test | neo4j

[error] 272-272: Error in run_tasks function while awaiting pipeline run.

cognee/modules/pipelines/operations/get_pipeline_status.py Outdated Show resolved Hide resolved
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Inline review comments failed to post. This is likely due to GitHub's limits when posting large numbers of comments.

Actionable comments posted: 1

🔭 Outside diff range comments (1)
cognee/tests/integration/run_toy_tasks/run_task_from_queue_test.py (1)

23-32: Fix pipeline_name parameter and type safety issues.

The test is failing because:

  1. pipeline_name parameter has been renamed to pipeline_id in the run_tasks function
  2. data parameter must be a list of Data objects, not a string

Apply this diff to fix the issues:

    tasks_run = run_tasks(
        [
            Task(queue_consumer),
            Task(add_one),
            Task(multiply_by_two),
        ],
        dataset_id=uuid4(),
-       data=str(data_queue),
+       data=[Data(content=str(data_queue))],
-       pipeline_name="test_run_tasks_from_queue",
+       pipeline_id="test_run_tasks_from_queue",
    )

Don't forget to add the import:

from cognee.modules.data.models import Data
🧰 Tools
🪛 GitHub Actions: test | python 3.11

[error] 23-23: TypeError: run_tasks() got an unexpected keyword argument 'pipeline_name'

🪛 GitHub Actions: test | python 3.12

[error] 23-23: TypeError: run_tasks() got an unexpected keyword argument 'pipeline_name'

🪛 GitHub Actions: test | python 3.10

[error] 23-23: TypeError: run_tasks() got an unexpected keyword argument 'pipeline_name'

🛑 Comments failed to post (1)
cognee/modules/pipelines/operations/run_tasks.py (1)

271-271: ⚠️ Potential issue

Add type validation for data parameter.

The pipeline failures indicate that the data parameter needs type validation to ensure all items are Data objects.

Add type validation at the start of the function:

 async def run_tasks(tasks: list[Task], dataset_id: UUID, data: list[Data], pipeline_id: str):
+    if not all(isinstance(item, Data) for item in data):
+        raise TypeError("All items in the data list must be of type Data")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

async def run_tasks(tasks: list[Task], dataset_id: UUID, data: list[Data], pipeline_id: str):
    if not all(isinstance(item, Data) for item in data):
        raise TypeError("All items in the data list must be of type Data")
    # ... rest of the function implementation ...

@alekszievr alekszievr marked this pull request as draft February 5, 2025 22:43
Copy link

gitguardian bot commented Feb 10, 2025

⚠️ GitGuardian has uncovered 1 secret following the scan of your pull request.

Please consider investigating the findings and remediating the incidents. Failure to do so may lead to compromising the associated services or software components.

🔎 Detected hardcoded secret in your pull request
GitGuardian id GitGuardian status Secret Commit Filename
9573981 Triggered Generic Password 1bd82f2 docker-compose.yml View secret
🛠 Guidelines to remediate hardcoded secrets
  1. Understand the implications of revoking this secret by investigating where it is used in your code.
  2. Replace and store your secret safely. Learn here the best practices.
  3. Revoke and rotate this secret.
  4. If possible, rewrite git history. Rewriting git history is not a trivial act. You might completely break other contributing developers' workflow and you risk accidentally deleting legitimate data.

To avoid such incidents in the future consider


🦉 GitGuardian detects secrets in your source code to help developers and security teams secure the modern development process. You are seeing this because you or someone else with access to this repository has authorized GitGuardian to scan your pull request.

@alekszievr alekszievr marked this pull request as ready for review February 11, 2025 08:54
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

♻️ Duplicate comments (1)
cognee/modules/pipelines/operations/logPipelineRunComplete.py (1)

28-32: ⚠️ Potential issue

Add error handling for database operations.

The database operations lack error handling which could lead to unhandled exceptions.

     db_engine = get_relational_engine()
 
-    async with db_engine.get_async_session() as session:
-        session.add(pipeline_run)
-        await session.commit()
+    try:
+        async with db_engine.get_async_session() as session:
+            session.add(pipeline_run)
+            await session.commit()
+    except Exception as e:
+        raise RuntimeError(f"Failed to log pipeline run completion: {str(e)}") from e
🧹 Nitpick comments (3)
cognee/modules/pipelines/operations/get_pipeline_status.py (1)

18-29: Add index hint for query optimization.

The query filters and partitions on JSON fields which can be slow without proper indexing.

Consider adding an index on the extracted dataset_id to improve query performance:

CREATE INDEX idx_pipeline_run_dataset_id ON pipeline_run ((run_info->>'dataset_id'));
cognee/modules/pipelines/operations/run_tasks.py (1)

271-285: Consider adding docstring and type hints for return value.

The function lacks documentation about its return value and behavior.

Apply this diff to add documentation:

 async def run_tasks(tasks: list[Task], dataset_id: UUID, data: Any, pipeline_id: str):
+    """Execute tasks and track pipeline status.
+    
+    Args:
+        tasks: List of tasks to execute
+        dataset_id: UUID of the dataset
+        data: Input data for tasks
+        pipeline_id: Identifier for the pipeline
+        
+    Yields:
+        PipelineRun: Status updates for the pipeline execution
+        
+    Raises:
+        Exception: If any task fails during execution
+    """
     pipeline_run = await logPipelineRunStart(pipeline_id, dataset_id, data)
notebooks/cognee_demo.ipynb (1)

677-678: LGTM! Consider logging status changes.

The improved structure with explicit parameters and status tracking is good. However, consider logging status changes for better observability.

 pipeline_run = run_tasks(tasks, dataset.id, data_documents, "cognify_pipeline")
 pipeline_run_status = None
+logger.info(f"Starting pipeline run with dataset_id: {dataset.id}")
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5fd1264 and 5961ade.

📒 Files selected for processing (14)
  • .pre-commit-config.yaml (1 hunks)
  • cognee/api/v1/add/add_v2.py (2 hunks)
  • cognee/api/v1/cognify/code_graph_pipeline.py (1 hunks)
  • cognee/api/v1/cognify/cognify_v2.py (2 hunks)
  • cognee/modules/pipelines/operations/get_pipeline_status.py (1 hunks)
  • cognee/modules/pipelines/operations/logPipelineRunComplete.py (1 hunks)
  • cognee/modules/pipelines/operations/logPipelineRunError.py (1 hunks)
  • cognee/modules/pipelines/operations/logPipelineRunStart.py (1 hunks)
  • cognee/modules/pipelines/operations/run_tasks.py (2 hunks)
  • cognee/tests/integration/run_toy_tasks/run_task_from_queue_test.py (3 hunks)
  • cognee/tests/integration/run_toy_tasks/run_tasks_test.py (2 hunks)
  • examples/python/code_graph_example.py (1 hunks)
  • notebooks/cognee_code_graph_demo.ipynb (2 hunks)
  • notebooks/cognee_demo.ipynb (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • .pre-commit-config.yaml
🚧 Files skipped from review as they are similar to previous changes (4)
  • cognee/tests/integration/run_toy_tasks/run_task_from_queue_test.py
  • cognee/modules/pipelines/operations/logPipelineRunStart.py
  • cognee/modules/pipelines/operations/logPipelineRunError.py
  • cognee/api/v1/add/add_v2.py
🧰 Additional context used
🪛 GitHub Actions: test | code graph example
examples/python/code_graph_example.py

[error] 40-40: UncaughtAttributeError: 'CompiledModule' object has no attribute 'non_stub_value_set'


[error] 10-10: UncaughtAttributeError: 'CompiledModule' object has no attribute 'non_stub_value_set'

🔇 Additional comments (7)
cognee/tests/integration/run_toy_tasks/run_tasks_test.py (1)

23-33: Add assertions for user parameter impact.

The test adds a user parameter but doesn't verify its impact on the pipeline execution.

Consider adding assertions to verify:

  1. The user is correctly passed to the pipeline
  2. The pipeline behavior differs with different users
  3. The pipeline handles invalid user scenarios
cognee/api/v1/cognify/code_graph_pipeline.py (1)

72-77: LGTM: Pipeline status tracking for non-code tasks.

The implementation correctly captures and tracks the pipeline run status for non-code tasks.

cognee/api/v1/cognify/cognify_v2.py (2)

96-97: Add await to the run_tasks call.

The run_tasks function appears to be asynchronous based on its usage with async for on line 99, but the call is missing the await keyword.


99-101: LGTM: Pipeline status tracking.

The implementation correctly captures and tracks the pipeline run status.

notebooks/cognee_code_graph_demo.ipynb (1)

95-101: LGTM: UUID generation for pipeline tasks.

The implementation correctly generates a UUID for the dataset ID using the repository location.

cognee/modules/pipelines/operations/run_tasks.py (2)

4-11: LGTM: New imports for enhanced functionality.

The imports correctly support the new pipeline status logging functionality.


277-278: Consider yielding task results.

The current implementation discards task results by using pass. Consider whether these results might be useful for monitoring or debugging.

Would you like me to help verify if any consumers of this API rely on the task results?

examples/python/code_graph_example.py Show resolved Hide resolved
cognee/modules/pipelines/operations/get_pipeline_status.py Outdated Show resolved Hide resolved
cognee/api/v1/cognify/code_graph_pipeline.py Show resolved Hide resolved
notebooks/cognee_demo.ipynb Show resolved Hide resolved
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (5)
cognee/modules/pipelines/models/PipelineRun.py (1)

4-4: Remove unused import.

The String type is imported but not used in the code.

Apply this diff to remove the unused import:

-from sqlalchemy import Column, DateTime, JSON, Enum, UUID, String
+from sqlalchemy import Column, DateTime, JSON, Enum, UUID
cognee/modules/pipelines/operations/log_pipeline_run_start.py (1)

8-34: Add docstring and improve error handling.

The function implementation looks good but could benefit from documentation and better error handling.

Apply these improvements:

 async def log_pipeline_run_start(pipeline_id: str, dataset_id: UUID, data: Any):
+    """Log the start of a pipeline run.
+    
+    Args:
+        pipeline_id (str): The ID of the pipeline.
+        dataset_id (UUID): The ID of the dataset being processed.
+        data (Any): The data being processed.
+        
+    Returns:
+        PipelineRun: The created pipeline run record.
+        
+    Raises:
+        ValueError: If pipeline_id or dataset_id is None.
+    """
+    if not pipeline_id or not dataset_id:
+        raise ValueError("pipeline_id and dataset_id must not be None")
+
     if not data:
         data_info = "None"
     elif isinstance(data, list) and all(isinstance(item, Data) for item in data):
cognee/modules/pipelines/operations/log_pipeline_run_error.py (1)

11-16: Consider using a more robust data serialization approach.

The current data handling logic might not handle all edge cases effectively. Consider using a dedicated serialization method or JSON encoder to handle complex data types consistently.

-    if not data:
-        data_info = "None"
-    elif isinstance(data, list) and all(isinstance(item, Data) for item in data):
-        data_info = [str(item.id) for item in data]
-    else:
-        data_info = str(data)
+    def serialize_data(data: Any) -> str:
+        if not data:
+            return "None"
+        if isinstance(data, list) and all(isinstance(item, Data) for item in data):
+            return json.dumps([str(item.id) for item in data])
+        try:
+            return json.dumps(data, default=str)
+        except (TypeError, ValueError):
+            return str(data)
+
+    data_info = serialize_data(data)
cognee/modules/pipelines/operations/run_tasks.py (2)

273-273: Consider using a more specific UUID namespace.

Using NAMESPACE_OID for pipeline IDs might not be the most appropriate choice. Consider using a custom namespace UUID specific to your application domain.

-    pipeline_id = uuid5(NAMESPACE_OID, pipeline_name)
+    PIPELINE_NAMESPACE = UUID('6ba7b810-9dad-11d1-80b4-00c04fd430c8')  # Replace with your application-specific namespace
+    pipeline_id = uuid5(PIPELINE_NAMESPACE, pipeline_name)

280-288: Consider adding retry logic for transient failures.

The error handling is basic and might not handle transient failures well. Consider implementing a retry mechanism for recoverable errors.

+    from tenacity import retry, stop_after_attempt, wait_exponential
+
+    @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
+    async def execute_tasks_with_retry(tasks, data, pipeline_id):
+        async for _ in run_tasks_with_telemetry(tasks, data, pipeline_id):
+            pass
+
     try:
-        async for _ in run_tasks_with_telemetry(tasks, data, pipeline_id):
-            pass
+        await execute_tasks_with_retry(tasks, data, pipeline_id)
 
         yield await log_pipeline_run_complete(pipeline_run_id, pipeline_id, dataset_id, data)
 
     except Exception as e:
+        logger.error(f"Pipeline execution failed after retries: {e}")
         yield await log_pipeline_run_error(pipeline_run_id, pipeline_id, dataset_id, data, e)
         raise e
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5961ade and 0b63bf9.

📒 Files selected for processing (8)
  • cognee/api/v1/add/add_v2.py (2 hunks)
  • cognee/modules/pipelines/models/PipelineRun.py (2 hunks)
  • cognee/modules/pipelines/operations/__init__.py (1 hunks)
  • cognee/modules/pipelines/operations/get_pipeline_status.py (3 hunks)
  • cognee/modules/pipelines/operations/log_pipeline_run_complete.py (1 hunks)
  • cognee/modules/pipelines/operations/log_pipeline_run_error.py (1 hunks)
  • cognee/modules/pipelines/operations/log_pipeline_run_start.py (1 hunks)
  • cognee/modules/pipelines/operations/run_tasks.py (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • cognee/modules/pipelines/operations/init.py
  • cognee/api/v1/add/add_v2.py
⏰ Context from checks skipped due to timeout of 90000ms (19)
  • GitHub Check: run_simple_example_test / test
  • GitHub Check: run_dynamic_steps_example_test / test
  • GitHub Check: run_notebook_test / test
  • GitHub Check: run_multimedia_example_test / test
  • GitHub Check: run_simple_example_test / test
  • GitHub Check: run_notebook_test / test
  • GitHub Check: run_notebook_test / test
  • GitHub Check: test
  • GitHub Check: test
  • GitHub Check: test
  • GitHub Check: test
  • GitHub Check: test
  • GitHub Check: test
  • GitHub Check: test
  • GitHub Check: test
  • GitHub Check: windows-latest
  • GitHub Check: test
  • GitHub Check: test
  • GitHub Check: docker-compose-test
🔇 Additional comments (2)
cognee/modules/pipelines/models/PipelineRun.py (1)

22-24: LGTM!

The new columns are correctly defined with appropriate types and indexing, which will help in tracking pipeline runs effectively.

cognee/modules/pipelines/operations/get_pipeline_status.py (1)

8-34: LGTM!

The changes correctly implement the transition from run_id to dataset_id. The query structure is sound, with proper partitioning and filtering based on dataset IDs.

@alekszievr alekszievr merged commit 05ba29a into dev Feb 11, 2025
26 of 29 checks passed
@alekszievr alekszievr deleted the fear/metrics branch February 11, 2025 15:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants