Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: observabiltiy-as-code #479

Merged
merged 7 commits into from
Mar 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions e2e_samples/parking_sensors/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,9 @@ The following summarizes key learnings and best practices demonstrated by this s

### 7. Monitor infrastructure, pipelines and data

- A proper monitoring solution should be in-place to ensure failures are identified, diagnosed and addressed in a timely manner. Aside from the base infrastructure and pipeline runs, data should also be monitored. A common area that should have data monitoring is the malformed record store.

- A proper monitoring solution should be in-place to ensure failures are identified, diagnosed and addressed in a timely manner. Aside from the base infrastructure and pipeline runs, data quality should also be monitored. A common area that should have data monitoring is the malformed record store.
- As an example this repository showcases how to use open source framework [Great Expectations](https://docs.greatexpectations.io/docs/) to define, measure and report data quality metrics at different stages of the data pipeline. Captured Data Quality metrics are reported to Azure Monitor for further visualizing and alerting. Take a look at sample [Data Quality report](docs/images/data_quality_report.png) generated with Azure Monitor workbook. Great Expectations can be configured to generate HTML reports and host directly as static site on Azure Blob Storage. Read more on [How to host and share Data Docs on Azure Blob Storage](https://legacy.docs.greatexpectations.io/en/latest/guides/how_to_guides/configuring_data_docs/how_to_host_and_share_data_docs_on_azure_blob_storage.html).

## Key Concepts

### Build and Release Pipeline
Expand Down Expand Up @@ -194,6 +195,8 @@ More resources:

### Observability / Monitoring

**Observability-as-Code** - Few key components of Observability and Monitoring are deployed and configured through Observability-as-Code at the time on Azure resources deployment. This includes log analytics workspace to collect monitoring data from key resources, central Azure dashboard to monitor key metrics and alerts to monitor the data pipelines. To learn more on monitoring specific service read below.

#### Databricks

- [Monitoring Azure Databricks with Azure Monitor](https://docs.microsoft.com/en-us/azure/architecture/databricks-monitoring/)
Expand Down Expand Up @@ -260,9 +263,8 @@ More resources:
- **DEPLOYMENT_ID** - string appended to all resource names. This is to ensure uniqueness of azure resource names. *Default*: random five character string.
- **AZDO_PIPELINES_BRANCH_NAME** - git branch where Azure DevOps pipelines definitions are retrieved from. *Default*: main.
- **AZURESQL_SERVER_PASSWORD** - Password of the SQL Server instance. *Default*: random string.

To further customize the solution, set parameters in `arm.parameters` files located in the `infrastructure` folder.

4. To further customize the solution, set parameters in `arm.parameters` files located in the `infrastructure` folder.
- To enable Observability and Monitoring components through code(Observability-as-code), please set enable_monitoring parameter to true in `arm.parameters` files located in the `infrastructure` folder. This will deploy log analytics workspace to collect monitoring data from key resources, setup an Azure dashboards to monitor key metrics and configure alerts for ADF pipelines.
2. **Deploy Azure resources**
1. Clone locally the imported Github Repo, then `cd` into the `e2e_samples/parking_sensors` folder of the repo
2. Run `./deploy.sh`.
Expand Down Expand Up @@ -332,7 +334,7 @@ After a successful deployment, you should have the following resources:
- SparkSQL tables created
- ADLS Gen2 mounted at `dbfs:/mnt/datalake` using the Storage Service Principal.
- Databricks KeyVault secrets scope created
- **Log Analytics Workspace** - including a kusto query on Query explorer -> Saved queries, to verify results that will be looged on Synapse notebooks (notebooks are not deployed yet).
- **Log Analytics Workspace** - including a kusto query on Query explorer -> Saved queries, to verify results that will be logged on Synapse notebooks (notebooks are not deployed yet).
- **Azure Synapse SQL Dedicated Pool (formerly SQLDW)** - currently, empty. The Release Pipeline will deploy the SQL Database objects.
- **Azure Synapse Spark Pool** - currently, empty. Configured to point the deployed Log Analytics workspace, under "Apache Spark Configuration".
- **Azure Synapse Workspace** - currently, empty.
Expand Down
168 changes: 145 additions & 23 deletions e2e_samples/parking_sensors/databricks/notebooks/02_standardize.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
# Databricks notebook source
# MAGIC %pip install great-expectations==0.14.12
# MAGIC %pip install opencensus-ext-azure==1.1.3

# COMMAND ----------

dbutils.widgets.text("infilefolder", "", "In - Folder Path")
infilefolder = dbutils.widgets.get("infilefolder")

Expand All @@ -7,16 +12,13 @@

# COMMAND ----------

from applicationinsights import TelemetryClient
tc = TelemetryClient(dbutils.secrets.get(scope = "storage_scope", key = "applicationInsightsKey"))

# COMMAND ----------

import os
import datetime

# For testing
# infilefolder = 'datalake/data/lnd/2019_03_11_01_38_00/'
# infilefolder = '2022_03_23_10_28_02/'
# loadid = 1

load_id = loadid
loaded_on = datetime.datetime.now()
base_path = os.path.join('dbfs:/mnt/datalake/data/lnd/', infilefolder)
Expand Down Expand Up @@ -61,26 +63,146 @@
# COMMAND ----------

# MAGIC %md
# MAGIC ### Metrics
# MAGIC ### Data Quality
# MAGIC The following uses the [Great Expectations](https://greatexpectations.io/) library. See [Great Expectation Docs](https://docs.greatexpectations.io/docs/) for more info.
# MAGIC
# MAGIC **Note**: for simplication purposes, the [Expectation Suite](https://docs.greatexpectations.io/docs/terms/expectation_suite) is created inline. Generally this should be created prior to data pipeline execution, and only loaded during runtime and executed against a data [Batch](https://docs.greatexpectations.io/docs/terms/batch/) via [Checkpoint](https://docs.greatexpectations.io/docs/terms/checkpoint/).

# COMMAND ----------

import datetime
import pandas as pd
from ruamel import yaml
from great_expectations.core.batch import RuntimeBatchRequest
from great_expectations.data_context import BaseDataContext
from great_expectations.data_context.types.base import (
DataContextConfig,
DatasourceConfig,
FilesystemStoreBackendDefaults,
)
from pyspark.sql import SparkSession, Row


root_directory = "/dbfs/great_expectations/"

# 1. Configure DataContext
# https://docs.greatexpectations.io/docs/terms/data_context
data_context_config = DataContextConfig(
datasources={
"parkingbay_data_source": DatasourceConfig(
class_name="Datasource",
execution_engine={"class_name": "SparkDFExecutionEngine"},
data_connectors={
"parkingbay_data_connector": {
"module_name": "great_expectations.datasource.data_connector",
"class_name": "RuntimeDataConnector",
"batch_identifiers": [
"environment",
"pipeline_run_id",
],
}
}
)
},
store_backend_defaults=FilesystemStoreBackendDefaults(root_directory=root_directory)
)
context = BaseDataContext(project_config=data_context_config)


# 2. Create a BatchRequest based on parkingbay_sdf dataframe.
# https://docs.greatexpectations.io/docs/terms/batch
batch_request = RuntimeBatchRequest(
datasource_name="parkingbay_data_source",
data_connector_name="parkingbay_data_connector",
data_asset_name="paringbaydataaset", # This can be anything that identifies this data_asset for you
batch_identifiers={
"environment": "stage",
"pipeline_run_id": "pipeline_run_id",
},
runtime_parameters={"batch_data": parkingbay_sdf}, # Your dataframe goes here
)


# 3. Define Expecation Suite and corresponding Data Expectations
# https://docs.greatexpectations.io/docs/terms/expectation_suite
expectation_suite_name = "parkingbay_data_exception_suite_basic"
context.create_expectation_suite(expectation_suite_name=expectation_suite_name, overwrite_existing=True)
sudivate marked this conversation as resolved.
Show resolved Hide resolved
validator = context.get_validator(
batch_request=batch_request,
expectation_suite_name=expectation_suite_name,
)
# Add Validatons to suite
# Check available expectations: validator.list_available_expectation_types()
# https://legacy.docs.greatexpectations.io/en/latest/autoapi/great_expectations/expectations/index.html
# https://legacy.docs.greatexpectations.io/en/latest/reference/core_concepts/expectations/standard_arguments.html#meta
validator.expect_column_values_to_not_be_null(column="meter_id")
validator.expect_column_values_to_not_be_null(column="marker_id")
validator.expect_column_values_to_be_of_type(column="rd_seg_dsc", type_="StringType")
validator.expect_column_values_to_be_of_type(column="rd_seg_id", type_="IntegerType")
# validator.validate() # To run run validations without checkpoint
validator.save_expectation_suite(discard_failed_expectations=False)



# 4. Configure a checkpoint and run Expectation suite using checkpoint
# https://docs.greatexpectations.io/docs/terms/checkpoint
my_checkpoint_name = "Parkingbay Data DQ"
checkpoint_config = {
"name": my_checkpoint_name,
"config_version": 1.0,
"class_name": "SimpleCheckpoint",
"run_name_template": "%Y%m%d-%H%M%S-my-run-name-template",
}
my_checkpoint = context.test_yaml_config(yaml.dump(checkpoint_config))
context.add_checkpoint(**checkpoint_config)
# Run Checkpoint passing in expectation suite.
checkpoint_result = context.run_checkpoint(
checkpoint_name=my_checkpoint_name,
validations=[
{
"batch_request": batch_request,
"expectation_suite_name": expectation_suite_name,
}
],
)


# COMMAND ----------

# MAGIC %md
# MAGIC ### Data Quality Metric Reporting
# MAGIC
# MAGIC This parses the results of the checkpoint and sends it to AppInsights / Azure Monitor for reporting.

# COMMAND ----------

parkingbay_count = t_parkingbay_sdf.count()
sensordata_count = t_sensordata_sdf.count()
parkingbay_malformed_count = t_parkingbay_malformed_sdf.count()
sensordata_malformed_count = t_sensordata_malformed_sdf.count()

tc.track_event('Standardize : Completed load',
properties={'parkingbay_filepath': parkingbay_filepath,
'sensors_filepath': sensors_filepath,
'load_id': load_id
},
measurements={'parkingbay_count': parkingbay_count,
'sensordata_count': sensordata_count,
'parkingbay_malformed_count': parkingbay_malformed_count,
'sensordata_malformed_count': sensordata_malformed_count
})
tc.flush()
import logging
import time
from opencensus.ext.azure.log_exporter import AzureLogHandler

logger = logging.getLogger(__name__)
logger.addHandler(AzureLogHandler(connection_string=dbutils.secrets.get(scope = "storage_scope", key = "applicationInsightsConnectionString")))

result_dic = checkpoint_result.to_json_dict()
key_name=[key for key in result_dic['run_results'].keys()][0]
results = result_dic['run_results'][key_name]['validation_result']['results']

checks = {'check_name':checkpoint_result['checkpoint_config']['name'],'pipelinerunid':loadid}
for i in range(len(results)):
validation_name= results[i]['expectation_config']['expectation_type'] + "_on_" + results[i]['expectation_config']['kwargs']['column']
checks[validation_name]=results[i]['success']

properties = {'custom_dimensions': checks}

if checkpoint_result.success is True:
logger.setLevel(logging.INFO)
logger.info('verifychecks', extra=properties)
else:
logger.setLevel(logging.ERROR)
logger.error('verifychecks', extra=properties)

time.sleep(16)


# COMMAND ----------

Expand Down
Loading