diff --git a/docs/bigquery.md b/docs/bigquery.md
index 447a3880..aa1175e3 100644
--- a/docs/bigquery.md
+++ b/docs/bigquery.md
@@ -71,6 +71,8 @@ user_conf = {
# user_config.se_notifications_on_fail: True,
# user_config.se_notifications_on_error_drop_exceeds_threshold_breach: True,
# user_config.se_notifications_on_error_drop_threshold: 15,
+ # user_config.se_enable_error_table: True,
+ # user_config.se_dq_rules_params: { "env": "local", "table": "product", },
}
diff --git a/docs/configurations/configure_rules.md b/docs/configurations/configure_rules.md
index 0fd947d2..032524a1 100644
--- a/docs/configurations/configure_rules.md
+++ b/docs/configurations/configure_rules.md
@@ -46,7 +46,7 @@ false, false, true)
```
-Please set up rules for checking the quality of the columns in the artificially order table, using the specified format
+Please set up rules for checking the quality of the columns in the artificial order table, using the specified format
```sql
insert into `catalog`.`schema`.`{product}_rules` (product_id, table_name, rule_type, rule, column_name, expectation,
@@ -79,7 +79,7 @@ action_if_failed, tag, description) values
--The query dq rule is established to check product_id difference between two table if difference is more than 20%
--from source table, the metadata of the rule will be captured in the statistics table as "action_if_failed" is "ignore"
,('apla_nd', '`catalog`.`schema`.customer_order', 'query_dq', 'product_missing_count_threshold', '*',
-'((select count(distinct product_id) from product) - (select count(distinct product_id) from order))>
+'((select count(distinct product_id) from {table}) - (select count(distinct product_id) from order))>
(select count(distinct product_id) from product)*0.2', 'ignore', 'validity', 'row count threshold difference must
be less than 20%', true, true, true)
@@ -87,7 +87,7 @@ be less than 20%', true, true, true)
--metadata of the rule will be captured in the statistics table along with fails the job as "action_if_failed" is
--"fail" and enabled for source dataset
,('apla_nd', '`catalog`.`schema`.customer_order', 'query_dq', 'product_category', '*', '(select count(distinct category)
-from product) < 5', 'fail', 'validity', 'distinct product category must be less than 5', true, False, true)
+from {table}) < 5', 'fail', 'validity', 'distinct product category must be less than 5', true, False, true)
--The query dq rule is established to check count of the dataset should be less than 10000 other wise the metadata
--of the rule will be captured in the statistics table as "action_if_failed" is "ignore" and enabled only for target dataset
diff --git a/docs/configurations/databricks_setup_guide.md b/docs/configurations/databricks_setup_guide.md
index 43b7ea26..b092380a 100644
--- a/docs/configurations/databricks_setup_guide.md
+++ b/docs/configurations/databricks_setup_guide.md
@@ -5,6 +5,6 @@ This section provides instructions on how to set up a sample notebook in the Dat
#### Prerequisite:
1. Recommended Databricks run time environment for better experience - DBS 11.0 and above
-3. Please install the Kafka jar using the path `dbfs:/kafka-jars/databricks-shaded-strimzi-kafka-oauth-client-1.1.jar`, If the jar is not available in the dbfs location, please raise a ticket with GAP Support team to add the jar to your workspace
-2. Please follow the steps provided [here](TODO) to integrate and clone repo from git Databricks
+2. Please install the Kafka jar using the path `dbfs:/kafka-jars/databricks-shaded-strimzi-kafka-oauth-client-1.1.jar`, If the jar is not available in the dbfs location, please raise a ticket with Platform team to add the jar to your workspace
+3. Please follow the steps provided [here](TODO) to integrate and clone repo from git Databricks
4. Please follow the steps to create the webhook-hook URL for team-specific channel [here](TODO)
\ No newline at end of file
diff --git a/docs/configurations/migration_versions_comparison.md b/docs/configurations/migration_versions_comparison.md
index b199cf12..bb7f60ff 100644
--- a/docs/configurations/migration_versions_comparison.md
+++ b/docs/configurations/migration_versions_comparison.md
@@ -4,17 +4,17 @@ Please find the difference in the changes with different version, latest three v
-| stage | 0.8.0 | 1.0.0 | 1.2.0 |
-|:----------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------|
-| rules table schema changes | added additional two column
1.`enable_error_drop_alert(boolean)`
2.`error_drop_threshold(int)`
documentation found [here](https://engineering.nike.com/spark-expectations/0.8.1/getting-started/setup/) | Remains same | Remains same |
-| rule table creation required | yes - creation not required if you're upgrading from old version but schema changes required | yes - creation not required if you're upgrading from old version but schema changes required | yes - creation not required if you're upgrading from old version but schema changes required |
-| stats table schema changes | remains same | Remains Same | Remains same. Additionally all row dq rules stats get in row dq rules summary |
-| stats table creation required | automated | Remains Same | Remains same |
-| notification config setting | remains same | Remains Same | Remains same |
-| secret store and Kafka authentication details | Create a dictionary that contains your secret configuration values and register in `__init__.py` for multiple usage, [example](https://engineering.nike.com/spark-expectations/0.8.1/examples/) | Remains Same. You can disable streaming if needed, in SparkExpectations class | Remains same |
-| spark expectations initialization | create spark expectations class object using `SparkExpectations` by passing `product_id` and additional optional parameter `debugger`, `stats_streaming_options` [example](https://engineering.nike.com/spark-expectations/0.8.1/examples/) | New arguments are added. Please follow this - [example](https://engineering.nike.com/spark-expectations/1.0.0/examples/) | Remains same |
-| with_expectations decorator | remains same | New arguments are added. Please follow this - [example](https://engineering.nike.com/spark-expectations/1.0.0/examples/) | Remains same |
-| WrappedDataFrameWriter | Doesn't exist | This is new and users need to provider the writer object to record the spark conf that need to be used while writing - [example](https://engineering.nike.com/spark-expectations/1.0.0/examples/) | Remains same |
+| stage | 0.8.0 | 1.0.0 | 1.2.0 |
+|:----------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------|
+| rules table schema changes | added additional two column
1.`enable_error_drop_alert(boolean)`
2.`error_drop_threshold(int)`
documentation found [here](https://engineering.nike.com/spark-expectations/0.8.1/getting-started/setup/) | Remains same | Remains same |
+| rule table creation required | yes - creation not required if you're upgrading from old version but schema changes required | yes - creation not required if you're upgrading from old version but schema changes required | Remains same. Additionally dq rules dynamically updates based on the parameter passed from externally |
+| stats table schema changes | remains same | Remains Same | Remains same. Additionally all row dq rules stats get in row dq rules summary |
+| stats table creation required | automated | Remains Same | Remains same |
+| notification config setting | remains same | Remains Same | Remains same |
+| secret store and Kafka authentication details | Create a dictionary that contains your secret configuration values and register in `__init__.py` for multiple usage, [example](https://engineering.nike.com/spark-expectations/0.8.1/examples/) | Remains Same. You can disable streaming if needed, in SparkExpectations class | Remains same |
+| spark expectations initialization | create spark expectations class object using `SparkExpectations` by passing `product_id` and additional optional parameter `debugger`, `stats_streaming_options` [example](https://engineering.nike.com/spark-expectations/0.8.1/examples/) | New arguments are added. Please follow this - [example](https://engineering.nike.com/spark-expectations/1.0.0/examples/) | Remains same |
+| with_expectations decorator | remains same | New arguments are added. Please follow this - [example](https://engineering.nike.com/spark-expectations/1.0.0/examples/) | Remains same |
+| WrappedDataFrameWriter | Doesn't exist | This is new and users need to provider the writer object to record the spark conf that need to be used while writing - [example](https://engineering.nike.com/spark-expectations/1.0.0/examples/) | Remains same |
diff --git a/docs/delta.md b/docs/delta.md
index e1a7c802..0baf2c79 100644
--- a/docs/delta.md
+++ b/docs/delta.md
@@ -61,6 +61,8 @@ user_conf = {
# user_config.se_notifications_on_fail: True,
# user_config.se_notifications_on_error_drop_exceeds_threshold_breach: True,
# user_config.se_notifications_on_error_drop_threshold: 15,
+ # user_config.se_enable_error_table: True,
+ # user_config.se_dq_rules_params: { "env": "local", "table": "product", },
}
diff --git a/docs/examples.md b/docs/examples.md
index b9e7b1dd..d4e2c9e0 100644
--- a/docs/examples.md
+++ b/docs/examples.md
@@ -20,7 +20,11 @@ se_user_conf = {
user_config.se_notifications_on_fail: True, # (11)!
user_config.se_notifications_on_error_drop_exceeds_threshold_breach: True, # (12)!
user_config.se_notifications_on_error_drop_threshold: 15, # (13)!
- user_config.se_enable_error_table: True, # (14)!
+ user_config.se_enable_error_table: True, # (14)!
+ user_config.se_dq_rules_params: {
+ "env": "local",
+ "table": "product",
+ }, # (15)!
}
}
```
@@ -39,6 +43,7 @@ se_user_conf = {
12. When `user_config.se_notifications_on_error_drop_exceeds_threshold_breach` parameter set to `True` enables notification when error threshold reaches above the configured value
13. The `user_config.se_notifications_on_error_drop_threshold` parameter captures error drop threshold value
14. The `user_config.se_enable_error_table` parameter, which controls whether error data to load into error table, is set to true by default
+15. The `user_config.se_dq_rules_params` parameter, which are required to dynamically update dq rules
### Spark Expectations Initialization
diff --git a/docs/iceberg.md b/docs/iceberg.md
index 4628f69d..8dbeda78 100644
--- a/docs/iceberg.md
+++ b/docs/iceberg.md
@@ -67,6 +67,8 @@ user_conf = {
# user_config.se_notifications_on_fail: True,
# user_config.se_notifications_on_error_drop_exceeds_threshold_breach: True,
# user_config.se_notifications_on_error_drop_threshold: 15,
+ # user_config.se_enable_error_table: True,
+ # user_config.se_dq_rules_params: { "env": "local", "table": "product", },
}
diff --git a/spark_expectations/config/user_config.py b/spark_expectations/config/user_config.py
index 7096d266..c73fb5db 100644
--- a/spark_expectations/config/user_config.py
+++ b/spark_expectations/config/user_config.py
@@ -41,6 +41,7 @@ class Constants:
se_enable_streaming = "se.enable.streaming"
se_enable_error_table = "se.enable.error.table"
+ se_dq_rules_params = "se.dq.rules.params"
secret_type = "se.streaming.secret.type"
diff --git a/spark_expectations/core/context.py b/spark_expectations/core/context.py
index 35b0232c..0e365315 100644
--- a/spark_expectations/core/context.py
+++ b/spark_expectations/core/context.py
@@ -38,6 +38,7 @@ def __post_init__(self) -> None:
self._dq_run_status: str = "Failed"
self._dq_expectations: Optional[Dict[str, str]] = None
self._se_enable_error_table: bool = True
+ self._dq_rules_params: Dict[str, str] = {}
# above configuration variable value has to be set to python
self._dq_project_env_name = "spark_expectations"
@@ -1566,7 +1567,7 @@ def get_stats_table_writer_config(self) -> dict:
"""
return self._stats_table_writer_config
- def set_se_enable_error_table(self, se_enable_error_table: bool) -> None:
+ def set_se_enable_error_table(self, _enable_error_table: bool) -> None:
"""
Args:
@@ -1575,7 +1576,7 @@ def set_se_enable_error_table(self, se_enable_error_table: bool) -> None:
Returns:
"""
- self._se_enable_error_table = se_enable_error_table
+ self._se_enable_error_table = _enable_error_table
@property
def get_se_enable_error_table(self) -> bool:
@@ -1585,3 +1586,23 @@ def get_se_enable_error_table(self) -> bool:
"""
return self._se_enable_error_table
+
+ def set_dq_rules_params(self, _dq_rules_params: dict) -> None:
+ """
+ This function set params for dq rules
+ Args:
+ _se_dq_rules_params:
+
+ Returns:
+
+ """
+ self._dq_rules_params = _dq_rules_params
+
+ @property
+ def get_dq_rules_params(self) -> dict:
+ """
+ This function returns params which are mapping in dq rules
+ Returns: _dq_rules_params(dict)
+
+ """
+ return self._dq_rules_params
diff --git a/spark_expectations/core/expectations.py b/spark_expectations/core/expectations.py
index b1b98e54..612a7f20 100644
--- a/spark_expectations/core/expectations.py
+++ b/spark_expectations/core/expectations.py
@@ -107,14 +107,16 @@ def with_expectations(
def _except(func: Any) -> Any:
# variable used for enabling notification at different level
- _default_notification_dict: Dict[str, Union[str, int, bool]] = {
+ _default_notification_dict: Dict[
+ str, Union[str, int, bool, Dict[str, str]]
+ ] = {
user_config.se_notifications_on_start: False,
user_config.se_notifications_on_completion: False,
user_config.se_notifications_on_fail: True,
user_config.se_notifications_on_error_drop_exceeds_threshold_breach: False,
user_config.se_notifications_on_error_drop_threshold: 100,
}
- _notification_dict: Dict[str, Union[str, int, bool]] = (
+ _notification_dict: Dict[str, Union[str, int, bool, Dict[str, str]]] = (
{**_default_notification_dict, **user_conf}
if user_conf
else _default_notification_dict
@@ -136,6 +138,18 @@ def _except(func: Any) -> Any:
else _default_stats_streaming_dict
)
+ enable_error_table = _notification_dict.get(
+ user_config.se_enable_error_table, True
+ )
+ self._context.set_se_enable_error_table(
+ enable_error_table if isinstance(enable_error_table, bool) else True
+ )
+
+ dq_rules_params = _notification_dict.get(user_config.se_dq_rules_params, {})
+ self._context.set_dq_rules_params(
+ dq_rules_params if isinstance(dq_rules_params, dict) else {}
+ )
+
# Overwrite the writers if provided by the user in the with_expectations explicitly
if target_and_error_table_writer:
self._context.set_target_and_error_table_writer_config(
@@ -144,7 +158,7 @@ def _except(func: Any) -> Any:
# need to call the get_rules_frm_table function to get the rules from the table as expectations
expectations, rules_execution_settings = self.reader.get_rules_from_df(
- self.rules_df, target_table
+ self.rules_df, target_table, params=self._context.get_dq_rules_params
)
_row_dq: bool = rules_execution_settings.get("row_dq", False)
@@ -200,18 +214,13 @@ def _except(func: Any) -> Any:
)
else False
)
+
+ notifications_on_error_drop_threshold = _notification_dict.get(
+ user_config.se_notifications_on_error_drop_threshold, 100
+ )
_error_drop_threshold: int = (
- int(
- _notification_dict[
- user_config.se_notifications_on_error_drop_threshold
- ]
- )
- if isinstance(
- _notification_dict[
- user_config.se_notifications_on_error_drop_threshold
- ],
- int,
- )
+ notifications_on_error_drop_threshold
+ if isinstance(notifications_on_error_drop_threshold, int)
else 100
)
@@ -416,6 +425,7 @@ def wrapper(*args: tuple, **kwargs: dict) -> DataFrame:
# _dq_final_agg_results: final agg dq result in dictionary
# _: number of error records
# status: status of the execution
+
(
_final_dq_df,
_dq_final_agg_results,
diff --git a/spark_expectations/examples/base_setup.py b/spark_expectations/examples/base_setup.py
index 8b873bc6..e4ecab1b 100644
--- a/spark_expectations/examples/base_setup.py
+++ b/spark_expectations/examples/base_setup.py
@@ -22,19 +22,19 @@
"""
RULES_DATA = """
- ("your_product", "dq_spark_local.customer_order", "row_dq", "customer_id_is_not_null", "customer_id", "customer_id is not null","drop", "validity", "customer_id ishould not be null", true, true,false, false, 0)
- ,("your_product", "dq_spark_local.customer_order", "row_dq", "sales_greater_than_zero", "sales", "sales > 2", "drop", "accuracy", "sales value should be greater than zero", true, true, true, false, 0)
- ,("your_product", "dq_spark_local.customer_order", "row_dq", "discount_threshold", "discount", "discount*100 < 60","drop", "validity", "discount should be less than 40", true, true, false, false, 0)
- ,("your_product", "dq_spark_local.customer_order", "row_dq", "ship_mode_in_set", "ship_mode", "lower(trim(ship_mode)) in('second class', 'standard class', 'standard class')", "drop", "validity", "ship_mode mode belongs in the sets", true, true, false, false, 0)
- ,("your_product", "dq_spark_local.customer_order", "row_dq", "profit_threshold", "profit", "profit>0", "drop", "validity", "profit threshold should be greater tahn 0", true, true, false, true, 0)
+ ("your_product", "dq_spark_local.customer_order", "row_dq", "customer_id_is_not_null", "customer_id", "customer_id is not null","drop", "validity", "customer_id ishould not be null", true, true,true, false, 0)
+ ,("your_product", "dq_spark_{env}.customer_order", "row_dq", "sales_greater_than_zero", "sales", "sales > 2", "drop", "accuracy", "sales value should be greater than zero", true, true, true, false, 0)
+ ,("your_product", "dq_spark_{env}.customer_order", "row_dq", "discount_threshold", "discount", "discount*100 < 60","drop", "validity", "discount should be less than 40", true, true, true, false, 0)
+ ,("your_product", "dq_spark_{env}.customer_order", "row_dq", "ship_mode_in_set", "ship_mode", "lower(trim(ship_mode)) in('second class', 'standard class', 'standard class')", "drop", "validity", "ship_mode mode belongs in the sets", true, true, true, false, 0)
+ ,("your_product", "dq_spark_local.customer_order", "row_dq", "profit_threshold", "profit", "profit>0", "drop", "validity", "profit threshold should be greater tahn 0", true, true, true, true, 0)
,("your_product", "dq_spark_local.customer_order", "agg_dq", "sum_of_sales", "sales", "sum(sales)>10000", "ignore", "validity", "regex format validation for quantity", true, true, true, false, 0)
,("your_product", "dq_spark_local.customer_order", "agg_dq", "sum_of_quantity", "quantity", "sum(sales)>10000", "ignore", "validity", "regex format validation for quantity", true, true, true, false, 0)
- ,("your_product", "dq_spark_local.customer_order", "agg_dq", "distinct_of_ship_mode", "ship_mode", "count(distinct ship_mode)<=3", "ignore", "validity", "regex format validation for quantity", true, true, true, false, 0)
+ ,("your_product", "dq_spark_{env}.customer_order", "agg_dq", "distinct_of_ship_mode", "ship_mode", "count(distinct ship_mode)<=3", "ignore", "validity", "regex format validation for quantity", true, true, true, false, 0)
,("your_product", "dq_spark_local.customer_order", "agg_dq", "row_count", "*", "count(*)>=10000", "ignore", "validity", "regex format validation for quantity", true, true, true, false, 0)
- ,("your_product", "dq_spark_local.customer_order", "query_dq", "product_missing_count_threshold", "*", "((select count(distinct product_id) from product) - (select count(distinct product_id) from order))>(select count(distinct product_id) from product)*0.2", "ignore", "validity", "row count threshold", true, true, true, false, 0)
- ,("your_product", "dq_spark_local.customer_order", "query_dq", "product_category", "*", "(select count(distinct category) from product) < 5", "ignore", "validity", "distinct product category", true, true, true, false, 0)
+ ,("your_product", "dq_spark_local.customer_order", "query_dq", "product_missing_count_threshold", "*", "((select count(distinct product_id) from {table}) - (select count(distinct product_id) from order))>(select count(distinct product_id) from product)*0.2", "ignore", "validity", "row count threshold", true, true, true, false, 0)
+ ,("your_product", "dq_spark_{env}.customer_order", "query_dq", "product_category", "*", "(select count(distinct category) from {table}) < 5", "ignore", "validity", "distinct product category", true, true, true, false, 0)
,("your_product", "dq_spark_local.customer_order", "query_dq", "row_count_in_order", "*", "(select count(*) from order)<10000", "ignore", "accuracy", "count of the row in order dataset", true, true, true, false, 0)
"""
diff --git a/spark_expectations/examples/sample_dq_bigquery.py b/spark_expectations/examples/sample_dq_bigquery.py
index e55417a5..fe2de344 100644
--- a/spark_expectations/examples/sample_dq_bigquery.py
+++ b/spark_expectations/examples/sample_dq_bigquery.py
@@ -56,6 +56,11 @@
user_config.se_notifications_on_fail: True,
user_config.se_notifications_on_error_drop_exceeds_threshold_breach: True,
user_config.se_notifications_on_error_drop_threshold: 15,
+ user_config.se_enable_error_table: True,
+ user_config.se_dq_rules_params: {
+ "env": "local",
+ "table": "product",
+ },
}
diff --git a/spark_expectations/examples/sample_dq_delta.py b/spark_expectations/examples/sample_dq_delta.py
index 636a6d4f..4188316a 100644
--- a/spark_expectations/examples/sample_dq_delta.py
+++ b/spark_expectations/examples/sample_dq_delta.py
@@ -40,6 +40,11 @@
user_config.se_notifications_on_fail: True,
user_config.se_notifications_on_error_drop_exceeds_threshold_breach: True,
user_config.se_notifications_on_error_drop_threshold: 15,
+ user_config.se_enable_error_table: True,
+ user_config.se_dq_rules_params: {
+ "env": "local",
+ "table": "product",
+ },
}
diff --git a/spark_expectations/examples/sample_dq_iceberg.py b/spark_expectations/examples/sample_dq_iceberg.py
index a9128c37..3519c82d 100644
--- a/spark_expectations/examples/sample_dq_iceberg.py
+++ b/spark_expectations/examples/sample_dq_iceberg.py
@@ -39,6 +39,11 @@
user_config.se_notifications_on_fail: True,
user_config.se_notifications_on_error_drop_exceeds_threshold_breach: True,
user_config.se_notifications_on_error_drop_threshold: 15,
+ user_config.se_enable_error_table: True,
+ user_config.se_dq_rules_params: {
+ "env": "local",
+ "table": "product",
+ },
}
diff --git a/spark_expectations/sinks/utils/writer.py b/spark_expectations/sinks/utils/writer.py
index ec8d1e22..677388ab 100644
--- a/spark_expectations/sinks/utils/writer.py
+++ b/spark_expectations/sinks/utils/writer.py
@@ -418,6 +418,9 @@ def write_error_records_final(
error_df = df.filter(f"size(meta_{rule_type}_results) != 0")
self._context.print_dataframe_with_debugger(error_df)
+ print(
+ f"self._context.get_se_enable_error_table : {self._context.get_se_enable_error_table}"
+ )
if self._context.get_se_enable_error_table:
self.save_df_as_table(
error_df,
diff --git a/spark_expectations/utils/actions.py b/spark_expectations/utils/actions.py
index f0deaa57..a33d27df 100644
--- a/spark_expectations/utils/actions.py
+++ b/spark_expectations/utils/actions.py
@@ -194,7 +194,7 @@ def run_dq_rules(
if rule_type == _context.get_agg_dq_rule_type_name
else _context.get_supported_df_query_dq
)
-
+ print(f"condition_expressions : {condition_expressions}")
df = df.select(*condition_expressions)
df = df.withColumn(
f"meta_{rule_type}_results", array(*list(df.columns))
@@ -212,6 +212,7 @@ def run_dq_rules(
_context.print_dataframe_with_debugger(df)
elif rule_type == _context.get_row_dq_rule_type_name:
+ print(f"condition_expressions : {condition_expressions}")
df = df.select(col("*"), *condition_expressions)
else:
raise SparkExpectationsMiscException(
diff --git a/spark_expectations/utils/reader.py b/spark_expectations/utils/reader.py
index b39c9bb6..7941c4de 100644
--- a/spark_expectations/utils/reader.py
+++ b/spark_expectations/utils/reader.py
@@ -140,6 +140,7 @@ def get_rules_from_df(
target_table: str,
is_dlt: bool = False,
tag: Optional[str] = None,
+ params: Optional[dict] = None,
) -> tuple[dict, dict]:
"""
This function fetches the data quality rules from the table and return it as a dictionary
@@ -149,6 +150,7 @@ def get_rules_from_df(
target_table: Provide the full table name for which the data quality rules are being run
is_dlt: True if this for fetching the rules for dlt job
tag: If is_dlt is True, provide the KPI for which you are running the data quality rule
+ params: dictionary values for dynamically updating dq rules
Returns:
tuple: returns a tuple of two dictionaries with key as 'rule_type' and 'rules_table_row' as value in
@@ -172,6 +174,9 @@ def get_rules_from_df(
& rules_df.is_active
)
+ if not params:
+ params = {}
+
self._context.print_dataframe_with_debugger(_rules_df)
_expectations: dict = {}
@@ -179,19 +184,19 @@ def get_rules_from_df(
if is_dlt:
if tag:
for row in _rules_df.filter(_rules_df.tag == tag).collect():
- _expectations[row["rule"]] = row["expectation"]
+ _expectations[row["rule"]] = row["expectation"].format(**params)
else:
for row in _rules_df.collect():
- _expectations[row["rule"]] = row["expectation"]
+ _expectations[row["rule"]] = row["expectation"].format(**params)
else:
for row in _rules_df.collect():
column_map = {
- "product_id": row["product_id"],
- "table_name": row["table_name"],
+ "product_id": row["product_id"].format(**params),
+ "table_name": row["table_name"].format(**params),
"rule_type": row["rule_type"],
- "rule": row["rule"],
+ "rule": row["rule"].format(**params),
"column_name": row["column_name"],
- "expectation": row["expectation"],
+ "expectation": row["expectation"].format(**params),
"action_if_failed": row["action_if_failed"],
"enable_for_source_dq_validation": row[
"enable_for_source_dq_validation"
diff --git a/tests/config/test_user_config.py b/tests/config/test_user_config.py
index 64f289ea..4fffd896 100644
--- a/tests/config/test_user_config.py
+++ b/tests/config/test_user_config.py
@@ -32,6 +32,8 @@ def test_constants():
assert user_config.se_enable_error_table == "se.enable.error.table"
+ assert user_config.se_dq_rules_params == "se.dq.rules.params"
+
assert user_config.secret_type == "se.streaming.secret.type"
assert user_config.cbs_url == "se.streaming.cerberus.url"
diff --git a/tests/core/test_context.py b/tests/core/test_context.py
index 9724ffa5..7f3aac7e 100644
--- a/tests/core/test_context.py
+++ b/tests/core/test_context.py
@@ -41,6 +41,7 @@ def test_context_properties():
context._final_agg_dq_status = "test_skipped"
context._dq_run_status = "test_failed"
context._se_enable_error_table = True
+ context._dq_rules_params = {}
context._dq_project_env_name = "APLAD-5063"
context._dq_config_file_name = "dq_spark_expectations_config.ini"
@@ -1655,3 +1656,13 @@ def test_set_enable_error_table():
# testing for False do not write error records in error table
context.set_se_enable_error_table(False)
assert context.get_se_enable_error_table is False
+
+
+def test_set_dq_rules_params():
+ # default case is empty dictionary for dq rules params and testing negative scenario
+ context = SparkExpectationsContext(product_id="product1", spark=spark)
+ assert context.get_dq_rules_params == {}
+
+ # testing when passing parameterizied values to dq rules
+ context._dq_rules_params = {'env': 'local'}
+ assert context.get_dq_rules_params == {'env': 'local'}
diff --git a/tests/core/test_expectations.py b/tests/core/test_expectations.py
index d74efe6a..3fab3665 100644
--- a/tests/core/test_expectations.py
+++ b/tests/core/test_expectations.py
@@ -2062,7 +2062,81 @@ def _error_threshold_exceeds(expectations):
{"row_dq_status": "Passed", "source_agg_dq_status": "Passed",
"final_agg_dq_status": "Passed", "run_status": "Passed",
"source_query_dq_status": "Passed", "final_query_dq_status": "Passed"} # status
- )
+ ),
+ (
+ # Test case 23
+ # In this test case, dq run set for query_dq source_query_dq and one of the rule is parameterized
+ # with action_if_failed (ignore) for query_dq
+ # collect stats in the test_stats_table & error into error_table
+ spark.createDataFrame(
+ [
+ # sum of col1 must be greater than 10(ignore)
+ # standard deviation of col3 must be greater than 0(ignore)
+ {"col1": 1, "col2": "a", "col3": 4},
+ # row doesn't meet row_dq expectation1(drop)
+ {"col1": 2, "col2": "b", "col3": 5},
+ # row meets all row_dq_expectations
+ {"col1": 3, "col2": "c", "col3": 6},
+ # row meets all row_dq_expectations
+ ]
+ ),
+ [
+ {
+ "product_id": "product1",
+ "table_name": "dq_spark.test_final_table",
+ "rule_type": "query_dq",
+ "rule": "sum_col1_threshold",
+ "column_name": "col1",
+ "expectation": "(select sum(col1) from {table}) > 10",
+ "action_if_failed": "ignore",
+ "tag": "validity",
+ "description": "sum of col1 value must be greater than 10",
+ "enable_for_source_dq_validation": True,
+ "enable_for_target_dq_validation": True,
+ "is_active": True,
+ "enable_error_drop_alert": False,
+ "error_drop_threshold": "20",
+ },
+ {
+ "product_id": "product1",
+ "table_name": "dq_spark.test_final_table",
+ "rule_type": "query_dq",
+ "rule": "stddev_col3_threshold",
+ "column_name": "col3",
+ "expectation": "(select stddev(col3) from test_table) > 0",
+ "action_if_failed": "ignore",
+ "tag": "validity",
+ "description": "stddev of col3 value must be greater than 0",
+ "enable_for_source_dq_validation": True,
+ "enable_for_target_dq_validation": True,
+ "is_active": True,
+ "enable_error_drop_alert": False,
+ "error_drop_threshold": "20",
+ }
+ ],
+ False, # write to table
+ False, # write to temp table
+ None, # expected result
+ 3, # input count
+ 0, # error count
+ 0, # output count
+ None, # source_agg_result
+ None, # final_agg_result
+ # final_agg_result
+ [{"description": "sum of col1 value must be greater than 10",
+ "rule": "sum_col1_threshold",
+ "rule_type": "query_dq", "action_if_failed": "ignore", "tag": "validity"}],
+ # source_query_dq_res
+ None, # final_query_dq_res
+ {"rules": {"num_dq_rules": 2, "num_row_dq_rules": 0},
+ "query_dq_rules": {"num_final_query_dq_rules": 2, "num_source_query_dq_rules": 2,
+ "num_query_dq_rules": 2}, # dq_rules
+ "agg_dq_rules": {"num_source_agg_dq_rules": 0, "num_agg_dq_rules": 0,
+ "num_final_agg_dq_rules": 0}},
+ {"row_dq_status": "Skipped", "source_agg_dq_status": "Skipped",
+ "final_agg_dq_status": "Skipped", "run_status": "Passed",
+ "source_query_dq_status": "Passed", "final_query_dq_status": "Skipped"} # status
+ ),
])
def test_with_expectations(input_df,
expectations,
@@ -2102,10 +2176,11 @@ def test_with_expectations(input_df,
se._context.set_error_count(0)
se._context._run_id = "product1_run_test"
+
# Decorate the mock function with required args
@se.with_expectations(
"dq_spark.test_final_table",
- user_conf={user_config.se_notifications_on_fail: False},
+ user_conf={user_config.se_notifications_on_fail: False, user_config.se_dq_rules_params: {'table' : 'test_table'} },
write_to_table=write_to_table,
write_to_temp_table=write_to_temp_table
)
@@ -2243,7 +2318,7 @@ def test_with_expectations_dataframe_not_returned_exception(_fixture_create_data
def test_with_expectations_exception(_fixture_create_database,
_fixture_spark_expectations,
_fixture_local_kafka_topic):
- rules_dict = {
+ rules_dict = [{
"product_id": "product1",
"table_name": "dq_spark.test_table",
"rule_type": "row_dq",
@@ -2259,7 +2334,9 @@ def test_with_expectations_exception(_fixture_create_database,
"enable_error_drop_alert": True,
"error_drop_threshold": "10"
}
- rules_df = spark.createDataFrame([rules_dict])
+ ]
+
+ rules_df = spark.createDataFrame(rules_dict)
writer = WrappedDataFrameWriter().mode("append").format("delta")
se = SparkExpectations(product_id="product1",
rules_df=rules_df,
@@ -2287,7 +2364,55 @@ def test_with_expectations_exception(_fixture_create_database,
spark.sql(f"DROP DATABASE {db.name} CASCADE")
spark.sql("CLEAR CACHE")
+def test_with_expectations_negative_parameter(_fixture_create_database,
+ _fixture_spark_expectations,
+ _fixture_local_kafka_topic):
+ rules_dict = [
+ {
+ "product_id": "product1",
+ "table_name": "dq_spark.test_final_table",
+ "rule_type": "query_dq",
+ "rule": "sum_col1_threshold",
+ "column_name": "col1",
+ "expectation": "(select sum(col1) from {table2}) > 10",
+ "action_if_failed": "ignore",
+ "tag": "validity",
+ "description": "sum of col1 value must be greater than 10",
+ "enable_for_source_dq_validation": True,
+ "enable_for_target_dq_validation": True,
+ "is_active": True,
+ "enable_error_drop_alert": False,
+ "error_drop_threshold": "20",
+ }
+ ]
+
+ rules_df = spark.createDataFrame(rules_dict)
+ writer = WrappedDataFrameWriter().mode("append").format("delta")
+ se = SparkExpectations(product_id="product1",
+ rules_df=rules_df,
+ stats_table="dq_spark.test_dq_stats_table",
+ stats_table_writer=writer,
+ target_and_error_table_writer=writer,
+ debugger=False,
+ )
+ partial_func = se.with_expectations(
+ "dq_spark.test_final_table",
+ user_conf={user_config.se_notifications_on_fail: False}
+ )
+
+ with pytest.raises(SparkExpectationsMiscException,
+ match=r"error occurred while retrieving rules list from the table 'table2'"):
+ # Create a mock object with a list return value
+ mock_func = Mock(return_value=["apple", "banana", "pineapple", "orange"])
+ # Decorate the mock function with required args
+ decorated_func = partial_func(mock_func)
+ decorated_func()
+
+ for db in spark.catalog.listDatabases():
+ if db.name != "default":
+ spark.sql(f"DROP DATABASE {db.name} CASCADE")
+ spark.sql("CLEAR CACHE")
# @patch('spark_expectations.core.expectations.SparkExpectationsNotify', autospec=True,
# spec_set=True)
# @patch('spark_expectations.notifications.push.spark_expectations_notify._notification_hook', autospec=True,