Skip to content

Conversation

@rmidhun23
Copy link
Contributor

@rmidhun23 rmidhun23 commented Jul 7, 2025

Closes: #52723

Following up with a PR to fix issue.

Why

  1. When using the Neo4jOperator to execute cypher queries, the parameters argument is not being passed correctly. The operator is designed to accept a parameters argument, but this argument is not utilized.

  2. The Neo4jOperator should correctly pass the parameters argument to the cypher query execution. This would allow for dynamic queries that can accept parameters at runtime.

How

  1. Neo4jHook hook to accept parameters when operator is executed.

  2. Pass parameters argument to Neo4j driver session for queries with schema or schemaless operations.

  3. Add tests for coverage.

Deployment Details

OS: macOS Sequoia Version 15.4.1 (24E263)
Docker: 20.10.14
Kubernetes: (Client Version: v1.22.5, Server Version: v1.33.1)
kind: v0.29.0 go1.24.2 darwin/arm64

Airflow: 2.10.5
Helm Chart: airflow-1.16.0 (2.10.5)

neo4j: 2025.04.0
Helm Chart: neo4j-2025.4.0  App Version: 2025.04.0  
"""
Dag to fix the neo4j provider parameter issue in Airflow.
"""

## ....
## ....
## ....


# Define the default arguments
default_args = {
    "owner": "airflow",
    "start_date": datetime(2025, 7, 5),
    "retries": 3,
}


# Define the DAG
@dag(
    dag_id="oss_contrib_airflow_neo4j_pipeline",
    description="Neo4j provider sample",
    default_args=default_args,
    schedule_interval=None,
    start_date=datetime(2025, 7, 5),
    catchup=False,
    tags=["neo4j provider"],
)
def oss_contrib_airflow_neo4j_pipeline():
    """
    DAG to test parameters binding using Neo4jProvider in Airflow.
    """

    ## connection uri: 
    neo4j_bolt_vars = Variable.get("bolt_neo4j", default_var="{}", deserialize_json=True)

    start = EmptyOperator(task_id="start")

    update_graph_db = Neo4jOperator(
        task_id="upd_graph_db",
        parameters={"name": "Airflow"},
        neo4j_conn_id=neo4j_bolt_vars.get("conn_id", "bolt_neo4j"),
        sql="""
          // schema neo4j
          MERGE (a:Person {name: $name})
          ON CREATE SET a.created = timestamp()
          ON MATCH SET a.updated = timestamp()
        """,
    )

    end = EmptyOperator(task_id="end")

    start >> update_graph_db >> end


DAG_INSTANCE = oss_contrib_airflow_neo4j_pipeline()

Result

Connected to Neo4j using Bolt protocol version 5.8 at neo4j://localhost:7687.
Type :help for a list of available commands or :exit to exit the shell.
Note that Cypher queries must end with a semicolon.
@neo4j> MATCH (a:Person) RETURN a;
+-----------------------------------------------------+
| a                                                   |
+-----------------------------------------------------+
| (:Person {name: "Airflow", created: 1751911510657}) |
+-----------------------------------------------------+

1 row
ready to start consuming query after 76 ms, results consumed after another 3 ms
@neo4j> 
Logs
[2025-07-07T18:04:48.747+0000] {dagbag.py:588} INFO - Filling up the DagBag from /opt/airflow/dags/oss_contrib_airflow_neo4j.py
<jemalloc>: MADV_DONTNEED does not work (memset will be used instead)
<jemalloc>: (This is the expected behaviour if you are running under QEMU)
[2025-07-07T18:05:07.069+0000] {oss_contrib_airflow_neo4j.py:50} INFO - create_or_update_connection: Connection bolt_neo4j already exsits:
[2025-07-07T18:05:07.498+0000] {task_command.py:467} INFO - Running <TaskInstance: oss_contrib_airflow_neo4j_pipeline.upd_graph_db manual__2025-07-07T17:09:15.242788+00:00 [queued]> on host oss-contrib-airflow-neo4j-pipeline-upd-graph-db-zh43aw29
[2025-07-07T18:05:08.102+0000] {local_task_job_runner.py:123} INFO - ::group::Pre task execution logs
[2025-07-07T18:05:08.305+0000] {taskinstance.py:2614} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: oss_contrib_airflow_neo4j_pipeline.upd_graph_db manual__2025-07-07T17:09:15.242788+00:00 [queued]>
[2025-07-07T18:05:08.373+0000] {taskinstance.py:2614} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: oss_contrib_airflow_neo4j_pipeline.upd_graph_db manual__2025-07-07T17:09:15.242788+00:00 [queued]>
[2025-07-07T18:05:08.375+0000] {taskinstance.py:2867} INFO - Starting attempt 4 of 6
[2025-07-07T18:05:08.800+0000] {taskinstance.py:2890} INFO - Executing <Task(Neo4jOperator): upd_graph_db> on 2025-07-07 17:09:15.242788+00:00
[2025-07-07T18:05:08.843+0000] {standard_task_runner.py:72} INFO - Started process 41 to run task
[2025-07-07T18:05:08.851+0000] {standard_task_runner.py:104} INFO - Running: ['airflow', 'tasks', 'run', 'oss_contrib_airflow_neo4j_pipeline', 'upd_graph_db', 'manual__2025-07-07T17:09:15.242788+00:00', '--job-id', '236', '--raw', '--subdir', 'DAGS_FOLDER/oss_contrib_airflow_neo4j.py', '--cfg-path', '/tmp/tmpfututloq']
[2025-07-07T18:05:08.857+0000] {standard_task_runner.py:105} INFO - Job 236: Subtask upd_graph_db
[2025-07-07T18:05:09.447+0000] {task_command.py:467} INFO - Running <TaskInstance: oss_contrib_airflow_neo4j_pipeline.upd_graph_db manual__2025-07-07T17:09:15.242788+00:00 [running]> on host oss-contrib-airflow-neo4j-pipeline-upd-graph-db-zh43aw29
[2025-07-07T18:05:10.175+0000] {pod_generator.py:472} WARNING - Model file /opt/airflow/pod_templates/pod_template_file.yaml does not exist
[2025-07-07T18:05:10.526+0000] {taskinstance.py:3134} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='oss_contrib_airflow_neo4j_pipeline' AIRFLOW_CTX_TASK_ID='upd_graph_db' AIRFLOW_CTX_EXECUTION_DATE='2025-07-07T17:09:15.242788+00:00' AIRFLOW_CTX_TRY_NUMBER='4' AIRFLOW_CTX_DAG_RUN_ID='manual__2025-07-07T17:09:15.242788+00:00'
[2025-07-07T18:05:10.531+0000] {taskinstance.py:732} INFO - ::endgroup::
[2025-07-07T18:05:10.568+0000] {neo4j.py:159} INFO - Executing:
USE neo4j
MERGE (a:Person {name: $name})
ON CREATE SET a.created = timestamp()
ON MATCH SET a.updated = timestamp()
[2025-07-07T18:05:10.570+0000] {neo4j.py:160} INFO - Parameters: {'name': 'Airflow'}
[2025-07-07T18:05:10.576+0000] {neo4j.py:113} INFO - Run: {'name': 'Airflow'}
[2025-07-07T18:05:10.596+0000] {base.py:84} INFO - Retrieving connection 'bolt_neo4j'
[2025-07-07T18:05:10.604+0000] {neo4j.py:50} INFO - URI: bolt://neo4j.neo4j.svc.cluster.local:7687
[2025-07-07T18:05:10.940+0000] {taskinstance.py:341} INFO - ::group::Post task execution logs
[2025-07-07T18:05:10.945+0000] {taskinstance.py:353} INFO - Marking task as SUCCESS. dag_id=oss_contrib_airflow_neo4j_pipeline, task_id=upd_graph_db, run_id=manual__2025-07-07T17:09:15.242788+00:00, execution_date=20250707T170915, start_date=20250707T180508, end_date=20250707T180510
[2025-07-07T18:05:11.372+0000] {local_task_job_runner.py:266} INFO - Task exited with return code 0
[2025-07-07T18:05:11.524+0000] {local_task_job_runner.py:245} INFO - ::endgroup::

Dag UI

pr-fix


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@boring-cyborg
Copy link

boring-cyborg bot commented Jul 7, 2025

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst)
Here are some useful points:

  • Pay attention to the quality of your code (ruff, mypy and type annotations). Our pre-commits will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
  • Always keep your Pull Requests rebased, otherwise your build might fail due to changes not related to your commits.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: dev@airflow.apache.org
    Slack: https://s.apache.org/airflow-slack

@rmidhun23 rmidhun23 marked this pull request as ready for review July 7, 2025 18:55
@eladkal
Copy link
Contributor

eladkal commented Jul 8, 2025

Tests are failing

providers/neo4j/src/airflow/providers/neo4j/operators/neo4j.py:66: error: Missing positional argument "parameters" in call to "run" of "Neo4jHook"  [call-arg]
            hook.run(self.sql)
            ^~~~~~~~~~~~~~~~~~
Found 1 error in 1 file (checked 3935 source files)

@rmidhun23
Copy link
Contributor Author

@BBQing The review comments are addressed. Please review, Thanks !!

@rmidhun23 rmidhun23 requested a review from BBQing July 9, 2025 14:50
Copy link
Contributor

@eladkal eladkal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have some concerns about this feature/bug-fix?

When using the Neo4jOperator to execute cypher queries, the parameters argument is not being passed correctly. The operator is designed to accept a parameters argument, but this argument is not utilized.

This suggest a bug fix as the claim is that parameters is not working but this parameter doesn't exist. This PR is what adding it.

I am not saying no to this PR but I am requesting some clarity here.
Please describe what is exactly missing, What real-life use case can not be served properly with the current operator.

@BBQing
Copy link
Contributor

BBQing commented Jul 9, 2025

@rmidhun23 now it looks fine to me. I have only one question - in the PR description you have claimed successful result, yet the submitted code contained bugs - very simple and obvious bugs. How did it happen to you? What went wrong in your workflow? Just out of curiosity.

@rmidhun23
Copy link
Contributor Author

hi @eladkal.

The issue is that the parameters accepted by the operator are not passed while running query in the Neo4j driver session. This PR aims to allow users to run both static and parameterized queries for Neo4j.

The operator supports only for static queries and you don't have an option to use dynamic values in query. An example use-case is where one relies on upstream data.

Let me know if this clarifies my intentions with this fix.

@rmidhun23
Copy link
Contributor Author

@rmidhun23 now it looks fine to me. I have only one question - in the PR description you have claimed successful result, yet the submitted code contained bugs - very simple and obvious bugs. How did it happen to you? What went wrong in your workflow? Just out of curiosity.

@BBQing Thanks for helping me out. I was able to verify its working fine from the logs which i have shared here (but I created a copy of the operator as a custom module with the changes and included my dag deployment).

@BBQing
Copy link
Contributor

BBQing commented Jul 9, 2025

@rmidhun23 now it looks fine to me. I have only one question - in the PR description you have claimed successful result, yet the submitted code contained bugs - very simple and obvious bugs. How did it happen to you? What went wrong in your workflow? Just out of curiosity.

@BBQing Thanks for helping me out. I was able to verify its working fine from the logs which i have shared here (but I created a copy of the operator as a custom module with the changes and included my dag deployment).

Thank you for your answer.

@rmidhun23 rmidhun23 force-pushed the fix/provider-neo4j branch 4 times, most recently from 13a2242 to e687699 Compare July 11, 2025 00:06
@rmidhun23
Copy link
Contributor Author

hi @eladkal.

The issue is that the parameters accepted by the operator are not passed while running query in the Neo4j driver session. This PR aims to allow users to run both static and parameterized queries for Neo4j.

The operator supports only for static queries and you don't have an option to use dynamic values in query. An example use-case is where one relies on upstream data.

Let me know if this clarifies my intentions with this fix.

@eladkal Would like your comments on this, Thanks

@eladkal eladkal self-requested a review July 18, 2025 16:04
@potiuk potiuk force-pushed the fix/provider-neo4j branch from e687699 to 5cd5f75 Compare July 19, 2025 13:20
@potiuk
Copy link
Member

potiuk commented Jul 19, 2025

hi @eladkal.
The issue is that the parameters accepted by the operator are not passed while running query in the Neo4j driver session. This PR aims to allow users to run both static and parameterized queries for Neo4j.
The operator supports only for static queries and you don't have an option to use dynamic values in query. An example use-case is where one relies on upstream data.
Let me know if this clarifies my intentions with this fix.

@eladkal Would like your comments on this, Thanks

New feature for me.

Copy link
Contributor

@eladkal eladkal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@eladkal
Copy link
Contributor

eladkal commented Jul 19, 2025

This PR aims to allow users to run both static and parameterized queries for Neo4j.

Would be good to mention this also in the docs with reference to the Neo4j docs.

@potiuk
Copy link
Member

potiuk commented Jul 19, 2025

Would be good to mention this also in the docs with reference to the Neo4j docs.

Maybe add/modify example ? @rmidhun23 ?

@rmidhun23
Copy link
Contributor Author

Would be good to mention this also in the docs with reference to the Neo4j docs.

Maybe add/modify example ? @rmidhun23 ?

@potiuk @eladkal Sure, will add an example to the docs and get back to you.

@rmidhun23
Copy link
Contributor Author

Would be good to mention this also in the docs with reference to the Neo4j docs.

Maybe add/modify example ? @rmidhun23 ?

@potiuk @eladkal Please review the docs content and let me know your thoughts.

@eladkal
Copy link
Contributor

eladkal commented Jul 23, 2025

static checks fail

@rmidhun23 rmidhun23 force-pushed the fix/provider-neo4j branch from 5657157 to ad78ab0 Compare July 23, 2025 20:28
@eladkal eladkal merged commit 2cb6079 into apache:main Jul 24, 2025
71 checks passed
@rmidhun23
Copy link
Contributor Author

@potiuk @eladkal Thanks for your review and suggestions !!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Missing parameters while query execution

4 participants