This is a data pipeline for processing large text datasets for healthcare. It pseudonymizes data and extracts additional data from it using Microsoft Presidio and Azure Text Analytics for Health. It also shows an approach to writing Spark pipelines in a testable, CI-friendly way that supports efficient incremental data processing.
First, go through the Quick Start guide to set up this repository and your FlowEHR instance.
The above link takes you through defining a file config.local.yaml
. The patient_notes
pipeline requires some special setup, so there is a file config.local.yaml in this directory that contains a sample configuration you could use. Replace the values that have REPLACE_ME
set to customize your deployment, and leave the rest as-is.
The values that need to be replaced are:
flowehr_id
: A unique identifier to apply to all deployed resources (e.g.myflwr
). Must be 12 characters or less.location
andcognitive-services-location
: Set to Azure region you are using, e.g.uksouth
.storage_account_name
andresource_group_name
inunity_catalog_metastore
section: Unity Catalog Metastore can only be created once per Azure tenant. If your tenant does not have Unity Catalog Metastore deployed, pick any suitable globally unique value forstorage_account_name
and any suitable name forresource_group_name
.databricks_account_id
: An ID for Databricks Account (on a tenant level). Follow the official documentation to obtain it: Locate your Account ID.cognitive-services-keys
andcognitive-services-location
: To run the Feature Extraction part of the pipeline, you need to have a Language service deployed in Azure. It is not deployed automatically during FlowEHR deployment, so you'll need to do it manually. See the screenshot below.azure-tenant-id
: Should contain Azure Tenant ID (also sometimes known as Directory ID) for your Azure Tenant.
After deploying FlowEHR, you will need to change one setting for the metrics to be displayed correctly. Head to the Application Insights resource deployed in your resource group, it should have a name like transform-ai-${flowehr_id}-dev
. Head to Usage and estimated costs
, click on Custom metrics (preview)
, and make sure custom metrics are sent to Azure with dimensions enabled:
Head over to the Databricks service created in your resource group, and import the IngestDummyData.ipynb notebook there.
To run the notebook, you need to create your personal cluster (as the FlowEHR cluster, created as part of FlowEHR deployment, can be only used by ADF instance). To do this, create a cluster of a desired configuration, make sure to select Single user as the Access mode, and copy the sections Spark config and Environment variables from the FlowEHR cluster. See screenshot:
Follow the instructions in the notebook to initialize the test data to run the pipeline.
To trigger the pipeline, head to the ADF instance in the resource group you have deployed in step 1. It will have a name like adf-${flowehr_id}-dev
and trigger the PatientsPipeline (click on Add Trigger - Trigger Now). See screenshot:
To check the logs, head to the Application Insights service created in your resource group, as described in step 2. There, head to the Logs
section. To see logs created by the pipeline, type traces
. See screenshot:
To check the metrics dashboard, look for PatientNotesPipelineStatusDashboard
created in your resource group. See screenshot:
Note: You might want to add another dimensions to the custom split, so that the dashboard shows rows inserted break down by
activity
(pseudonymisation
orfeature_extraction
), andoperation
(insert
ordelete
).
Optionally, you could use SQL Warehouse to query data in Unity Catalog. To do so, you will need to create a SQL Warehouse. Any default settings will do:
Once it's created, head to SQL Editor view in Databricks and you can write SQL queries to quickly examine the data.
To learn more about the design principles behind this pipeline, head to the design document.
These are the files that are useful to explore:
- entrypoint.py: Entrypoint for the pipeline, this is where the pipeline starts to run from.
- transform.py: File that defines transformations.
- Tests: Tests for the above transformations.
- Test configuration: Helper fixture using [] for writing unit tests with PySpark
- db.py: Helpers for working with Microsoft SQL database
- datalake.py: File that contains helper methods for working with Data Lake.
- watermark.py: File that has the logic for the watermark algorithm, used for incremental updates.
- monitoring.py: Helpers for sending logs and metrics to Azure Monitor.
- Makefile: Used for command shortcuts, and certain commands are expected to be defined to ensure successful deployment of the pipeline to Azure.
- pyproject.toml: Defines building of the Python wheel that contains all code defined for the pipeline.