-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Description
Apache Airflow Provider(s)
neo4j
Versions of Apache Airflow Providers
apache-airflow-providers-http==5.3.0
apache-airflow-providers-cncf-kubernetes==10.5.0
apache-airflow-providers-neo4j==3.9.0
Apache Airflow version
2.10.5
Operating System
macOS Sequoia Version 15.4.1
Deployment
Official Apache Airflow Helm Chart
Deployment details
Docker:
Version: 20.10.14, build a224086
Kubernetes:
Client Version: v1.22.5
Server Version: v1.33.1
kind:
Version: v0.29.0 go1.24.2 darwin/arm64
airflow (Helm):
Chart: airflow-1.16.0 App Version: 2.10.5
neo4j (Helm):
Chart: neo4j-2025.4.0 App Version: 2025.04.0
What happened
When using the Neo4jOperator to execute cypher queries, the parameters argument is not being passed correctly. The operator is designed to accept a parameters argument,
but it seems that this argument is not being utilized when executing the query. As a result of this query execution and eventually task fails with neo4j.exceptions.GqlError: {gql_status: 42N81} {gql_status_description: error: syntax error or access rule violation - missing request parameter exception.
Error
..... ..... .....[2025-07-01T16:34:41.181+0000] {base.py:84} INFO - Retrieving connection 'bolt_neo4j'
[2025-07-01T16:34:41.187+0000] {neo4j.py:61} INFO - URI: bolt://neo4j.neo4j.svc.cluster.local:7687
[2025-07-01T16:34:41.294+0000] {taskinstance.py:3313} ERROR - Task failed with exception
neo4j.exceptions.GqlError: {gql_status: 42N81} {gql_status_description: error: syntax error or access rule violation - missing request parameter . Expected $name, but got .} {message: 42N81: Expected $name, but got .} {diagnostic_record: {'_classification': 'CLIENT_ERROR', 'OPERATION': '', 'OPERATION_CODE': '0', 'CURRENT_SCHEMA': '/'}} {raw_classification: CLIENT_ERROR}
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 768, in _execute_task
result = _execute_callable(context=context, **execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 734, in _execute_callable
return ExecutionCallableRunner(
^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/operator_helpers.py", line 252, in run
return self.func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 424, in wrapper
return func(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/neo4j/operators/neo4j.py", line 65, in execute
hook.run(self.sql)
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/neo4j/hooks/neo4j.py", line 127, in run
result = session.run(query)
^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/neo4j/_sync/work/session.py", line 328, in run
self._auto_result._run(
File "/home/airflow/.local/lib/python3.12/site-packages/neo4j/_sync/work/result.py", line 236, in _run
self._attach()
File "/home/airflow/.local/lib/python3.12/site-packages/neo4j/_sync/work/result.py", line 430, in _attach
self._connection.fetch_message()
File "/home/airflow/.local/lib/python3.12/site-packages/neo4j/_sync/io/_common.py", line 184, in inner
func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.12/site-packages/neo4j/_sync/io/_bolt.py", line 864, in fetch_message
res = self._process_message(tag, fields)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/neo4j/_sync/io/_bolt5.py", line 1208, in _process_message
response.on_failure(summary_metadata or {})
File "/home/airflow/.local/lib/python3.12/site-packages/neo4j/_sync/io/_common.py", line 254, in on_failure
raise self._hydrate_error(metadata)
neo4j.exceptions.ClientError: {code: Neo.ClientError.Statement.ParameterMissing} {message: Expected parameter(s): name}
[2025-07-01T16:34:41.452+0000] {taskinstance.py:1226} INFO - Marking task as UP_FOR_RETRY. dag_id=test_graph_pipeline, task_id=up_graph_db, run_id=manual__2025-07-01T15:53:36.426021+00:00, execution_date=20250701T155336, start_date=20250701T163439, end_date=20250701T163441
[2025-07-01T16:34:41.786+0000] {taskinstance.py:341} INFO - ::group::Post task execution logs
[2025-07-01T16:34:41.788+0000] {standard_task_runner.py:124} ERROR - Failed to execute job 385 for task up_graph_db ({code: Neo.ClientError.Statement.ParameterMissing} {message: Expected parameter(s): name}; 41)
neo4j.exceptions.GqlError: {gql_status: 42N81} {gql_status_description: error: syntax error or access rule violation - missing request parameter . Expected $name, but got .} {message: 42N81: Expected $name, but got .} {diagnostic_record: {'_classification': 'CLIENT_ERROR', 'OPERATION': '', 'OPERATION_CODE': '0', 'CURRENT_SCHEMA': '/'}} {raw_classification: CLIENT_ERROR}
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/task/task_runner/standard_task_runner.py", line 117, in _start_by_fork
ret = args.func(args, dag=self.dag)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/cli_config.py", line 49, in command
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/cli.py", line 116, in wrapper
return f(*args, **kwargs)
^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/task_command.py", line 483, in task_run
task_return_code = _run_task_by_selected_method(args, _dag, ti)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/task_command.py", line 256, in _run_task_by_selected_method
return _run_raw_task(args, ti)
^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/task_command.py", line 341, in _run_raw_task
return ti._run_raw_task(
^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py", line 97, in wrapper
return func(*args, session=session, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 3006, in _run_raw_task
return _run_raw_task(
^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 274, in _run_raw_task
TaskInstance._execute_task_with_callbacks(
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 3161, in _execute_task_with_callbacks
result = self._execute_task(context, task_orig)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 3185, in _execute_task
return _execute_task(self, context, task_orig)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 768, in _execute_task
result = _execute_callable(context=context, **execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 734, in _execute_callable
return ExecutionCallableRunner(
^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/operator_helpers.py", line 252, in run
return self.func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 424, in wrapper
return func(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/neo4j/operators/neo4j.py", line 65, in execute
hook.run(self.sql)
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/neo4j/hooks/neo4j.py", line 127, in run
result = session.run(query)
^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/neo4j/_sync/work/session.py", line 328, in run
self._auto_result._run(
File "/home/airflow/.local/lib/python3.12/site-packages/neo4j/_sync/work/result.py", line 236, in _run
self._attach()
File "/home/airflow/.local/lib/python3.12/site-packages/neo4j/_sync/work/result.py", line 430, in _attach
self._connection.fetch_message()
File "/home/airflow/.local/lib/python3.12/site-packages/neo4j/_sync/io/_common.py", line 184, in inner
func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.12/site-packages/neo4j/_sync/io/_bolt.py", line 864, in fetch_message
res = self._process_message(tag, fields)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/neo4j/_sync/io/_bolt5.py", line 1208, in _process_message
response.on_failure(summary_metadata or {})
File "/home/airflow/.local/lib/python3.12/site-packages/neo4j/_sync/io/_common.py", line 254, in on_failure
raise self._hydrate_error(metadata)
neo4j.exceptions.ClientError: {code: Neo.ClientError.Statement.ParameterMissing} {message: Expected parameter(s): name}
[2025-07-01T16:34:41.826+0000] {local_task_job_runner.py:266} INFO - Task exited with return code 1
[2025-07-01T16:34:41.986+0000] {local_task_job_runner.py:245} INFO - ::endgroup::
What you think should happen instead
The Neo4jOperator should correctly pass the parameters argument to the cypher query execution. This would allow for dynamic queries that can accept parameters at runtime, ans should support either plain arguments or XComArg from upstream tasks.
How to reproduce
- Deploy
airflowandneo4jusing Helm charts. - Run a DAG with
Neo4jOperatorwithparametersargument and a query that uses it. Neo4jOperatortask fails with error code GQLSTATUS:42N81 due to missing parameters.
import logging
import json
from datetime import datetime
# pylint: disable=import-error
from airflow.decorators import dag
from airflow.models import Connection, Variable
from airflow.utils.session import provide_session
from airflow.operators.empty import EmptyOperator
from airflow.providers.neo4j.operators.neo4j import Neo4jOperator
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@provide_session
def create_or_update_connection(
conn_id, conn_type, host, schema=None, port=None, extra=None, session=None
):
"""
Create or update a connection in Airflow.
"""
conn = session.query(Connection).filter(Connection.conn_id == conn_id).first()
if conn is None:
conn = Connection(
conn_id=conn_id,
conn_type=conn_type,
description=None,
host=host,
login=None,
password=None,
schema=schema,
port=port,
extra=extra,
)
session.add(conn)
session.commit()
logger.info("create_or_update_connection: Connection %s created", conn_id)
else:
logger.info(
"create_or_update_connection: Connection %s already exsits:", conn_id
)
conn.conn_type = conn_type
conn.host = host
conn.schema = schema
conn.port = port
conn.extra = extra
session.commit()
session.close()
def merge_neo4j_bolt_connection(var):
"""
Create or update the Neo4j bolt connection in Airflow.
"""
## Assumes neo4j standalone deployment
## Using bolt unsecured on 7687
conn_id = var.get("conn_id", "bolt_neo4j")
conn_type = var.get("conn_type", "neo4j")
port = int(var.get("port", 7687))
host = var.get("host", "neo4j.neo4j.svc.cluster.local")
schema = var.get("schema", "neo4j")
extra_options = var.get("extra", json.loads("""{ "encrypted": false }"""))
create_or_update_connection(
conn_id=conn_id,
conn_type=conn_type,
host=host,
schema=schema,
port=port,
extra=json.dumps(extra_options),
)
# Merge the connection
neo4j_bolt_vars = Variable.get("bolt_neo4j", default_var="{}", deserialize_json=True)
merge_neo4j_bolt_connection(neo4j_bolt_vars)
# Define the DAG
@dag(
dag_id="test_graph_pipeline",
description="Neo4j provider sample",
default_args={
"owner": "airflow",
"start_date": datetime(2025, 5, 25),
"retries": 3,
},
schedule_interval=None,
start_date=datetime(2025, 6, 23),
catchup=False,
tags=["airflow, neo4j provider"],
)
def test_graph_pipeline():
"""
DAG to test parameters binding using Neo4jProvider in Airflow.
"""
start = EmptyOperator(task_id="start")
update_graph_db = Neo4jOperator(
task_id="up_graph_db",
parameters={"name": "Airflow"},
neo4j_conn_id=neo4j_bolt_vars.get("conn_id", "bolt_neo4j"),
sql="""
USE neo4j
MERGE (a:Person {name: $name})
ON CREATE SET a.created = timestamp()
ON MATCH SET a.updated = timestamp()
""",
)
end = EmptyOperator(task_id="end")
start >> update_graph_db >> end
DAG_INSTANCE = test_graph_pipeline()Anything else
- Deploy
airflowandneo4jusing Helm charts. - Run a DAG with
Neo4jOperatorwithparametersargument and a query that uses it. Neo4jOperatortask fails with error code GQLSTATUS:42N81 due to missing parameters.
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct
