diff --git a/providers/common/sql/docs/operators.rst b/providers/common/sql/docs/operators.rst index 837a003739748..82016b3b9f3f5 100644 --- a/providers/common/sql/docs/operators.rst +++ b/providers/common/sql/docs/operators.rst @@ -148,6 +148,29 @@ The below example demonstrates how to instantiate the SQLTableCheckOperator task :end-before: [END howto_operator_sql_table_check] +.. _howto/operator:SQLValueCheckOperator: + +Check value against expected +~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Use the :class:`~airflow.providers.common.sql.operators.sql.SQLValueCheckOperator` to compare a SQL query result +against an expected value, with some optionally specified tolerance for numeric results. +The parameters for this operator are: + +- ``sql`` - the sql query to be executed, as a templated string. +- ``pass_value`` - the expected value to compare the query result against. +- ``tolerance`` (optional) - numerical tolerance for comparisons involving numeric values. +- ``conn_id`` (optional) - the connection ID used to connect to the database. +- ``database`` (optional) - name of the database which overwrites the name defined in the connection. + +The below example demonstrates how to instantiate the SQLValueCheckOperator task. + +.. exampleinclude:: /../tests/system/common/sql/example_sql_value_check.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_sql_value_check] + :end-before: [END howto_operator_sql_value_check] + .. _howto/operator:SQLThresholdCheckOperator: Check values against a threshold diff --git a/providers/common/sql/src/airflow/providers/common/sql/operators/sql.py b/providers/common/sql/src/airflow/providers/common/sql/operators/sql.py index 98a8e240ac457..ab9569ee18f9c 100644 --- a/providers/common/sql/src/airflow/providers/common/sql/operators/sql.py +++ b/providers/common/sql/src/airflow/providers/common/sql/operators/sql.py @@ -862,6 +862,9 @@ class SQLValueCheckOperator(BaseSQLOperator): :param sql: the sql to be executed. (templated) :param conn_id: the connection ID used to connect to the database. :param database: name of database which overwrite the defined one in connection + :param pass_value: the value to check against + :param tolerance: (optional) the tolerance allowed to pass records within for + numeric queries """ __mapper_args__ = {"polymorphic_identity": "SQLValueCheckOperator"} diff --git a/providers/common/sql/tests/system/common/sql/example_sql_value_check.py b/providers/common/sql/tests/system/common/sql/example_sql_value_check.py new file mode 100644 index 0000000000000..a904cdcd11d59 --- /dev/null +++ b/providers/common/sql/tests/system/common/sql/example_sql_value_check.py @@ -0,0 +1,62 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from airflow import DAG +from airflow.providers.common.sql.operators.sql import SQLValueCheckOperator +from airflow.sdk.timezone import datetime + +connection_args = { + "conn_id": "sales_db", + "conn_type": "Postgres", + "host": "postgres", + "schema": "postgres", + "login": "postgres", + "password": "postgres", + "port": 5432, +} + +with DAG( + "example_sql_value_check_query", + description="Example DAG for SQLValueCheckOperator.", + default_args=connection_args, + start_date=datetime(2025, 12, 15), + schedule=None, + catchup=False, +) as dag: + """ + ### Example SQL value check DAG + + Runs the SQLValueCheckOperator against the Airflow metadata DB. + """ + + # [START howto_operator_sql_value_check] + value_check = SQLValueCheckOperator( + task_id="threshhold_check", + conn_id="sales_db", + sql="SELECT count(distinct(customer_id)) FROM sales LIMIT 50;", + pass_value=40, + tolerance=5, + ) + # [END howto_operator_sql_value_check] + + +from tests_common.test_utils.system_tests import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag)