Skip to content

Commit

Permalink
feat: Pandas-Profiling TableColumnStats Extractor (amundsen-io#1105)
Browse files Browse the repository at this point in the history
introduces pandas-profiling based extractor for table column stats
  • Loading branch information
mgorsk1 authored and Zachary Ruiz committed May 13, 2022
1 parent 9cd74a4 commit 6232439
Show file tree
Hide file tree
Showing 8 changed files with 1,279 additions and 4 deletions.
8 changes: 8 additions & 0 deletions .github/boring-cyborg.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,14 @@ labelPRBasedOnFilePath:
area:docs:
- docs/**/*
- docs_overrides/**/*
- databuilder/docs/**/*
- databuilder/README.md
- frontend/docs/**/*
- frontend/README.md
- metadata/docs/**/*
- metadata/README.md
- search/docs/**/*
- search/README.md

area:frontend:
- frontend/**/*
Expand Down
9 changes: 5 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,13 @@ Please visit [Architecture](./architecture/) for Amundsen architecture overview.

Amundsen can also connect to any database that provides `dbapi` or `sql_alchemy` interface (which most DBs provide).

### Table Column Statistics

- [Pandas Profiling](https://pandas-profiling.github.io/pandas-profiling/docs/master/rtd/)

### Dashboard Connectors

- [Apache Superset](https://superset.apache.org/)
- [Mode Analytics](https://mode.com/)
- [Redash](https://redash.io/)
- [Tableau](https://tableau.com/)
Expand All @@ -142,10 +147,6 @@ Amundsen can also connect to any database that provides `dbapi` or `sql_alchemy`

- [Apache Airflow](https://airflow.apache.org/)

### BI Viz Tool

- [Apache Superset](https://superset.incubator.apache.org/)

## Installation

Please visit [Installation guideline](./installation) on how to install Amundsen.
Expand Down
97 changes: 97 additions & 0 deletions databuilder/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1172,6 +1172,103 @@ job = DefaultJob(conf=job_config,
job.launch()
```

### [PandasProfilingColumnStatsExtractor](./databuilder/extractor/pandas_profiling_column_stats_extractor.py)

[Pandas profiling](https://github.com/pandas-profiling/pandas-profiling) is a library commonly used by Data Engineer and Scientists to calculate advanced data profiles on data.
It is run on pandas dataframe and results in json file containing (amongst other things) descriptive and quantile statistics on columns.

#### Required input parameters
- `FILE_PATH` - file path to pandas-profiling **json** report
- `TABLE_NAME` - name of the table for which report was calculated
- `SCHEMA_NAME` - name of the schema from which table originates
- `DATABASE_NAME` - name of database technology from which table originates
- `CLUSTER_NAME` - name of the cluster from which table originates

#### Optional input parameters

- `PRECISION` - precision for metrics of `float` type. Defaults to `3` meaning up to 3 digits after decimal point.
- `STAT_MAPPINGS` - if you wish to collect only selected set of metrics configure this option with dictionary of following format:
- key - raw name of the stat in pandas-profiling
- value - tuple of 2 elements:
- first value of the tuple - full name of the stat (this influences what will be rendered for user in UI)
- second value of the tuple - function modifying the stat (by default we just do type casting)

Such dictionary should in that case contain only keys of metrics you wish to collect.

For example - if you want only min and max value of a column, provide extractor with configuration option:

```python
PandasProfilingColumnStatsExtractor.STAT_MAPPINGS = {'max': ('Maximum', float), 'min': ('Minimum', float)}
```

Complete set of available metrics is defined as DEFAULT_STAT_MAPPINGS attribute of PandasProfilingColumnStatsExtractor.

#### Common usage patterns

As pandas profiling is executed on top of pandas dataframe, it is up to the user to populate the dataframe before running
the report calculation (and subsequently the extractor). While doing so remember that it might not be a good idea to run the
report on a complete set of rows if your tables are very sparse. In such case it is recommended to dump a subset of rows
to pandas dataframe beforehand and calculate the report on just a sample of original data.

##### Spark support

Support for native execution of pandas-profiling on Spark Dataframe is currently worked on and should come in the future.

#### Sample job config

```python
import pandas as pd
import pandas_profiling
from pyhocon import ConfigFactory
from sqlalchemy import create_engine

from databuilder.extractor.pandas_profiling_column_stats_extractor import PandasProfilingColumnStatsExtractor
from databuilder.job.job import DefaultJob
from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCSVLoader
from databuilder.task.task import DefaultTask

table_name = 'video_game_sales'
schema_name = 'superset'

# Load table contents to pandas dataframe
db_uri = f'postgresql://superset:superset@localhost:5432/{schema_name}'
engine = create_engine(db_uri, echo=True)

df = pd.read_sql_table(
table_name,
con=engine
)

# Calculate pandas-profiling report on a table
report_file = '/tmp/table_report.json'

report = df.profile_report(sort=None)
report.to_file(report_file)

# Run PandasProfilingColumnStatsExtractor on calculated report
tmp_folder = f'/tmp/amundsen/column_stats'

dict_config = {
f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.NODE_DIR_PATH}': f'{tmp_folder}/nodes',
f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.RELATION_DIR_PATH}': f'{tmp_folder}/relationships',
f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.SHOULD_DELETE_CREATED_DIR}': False,
'extractor.pandas_profiling.table_name': table_name,
'extractor.pandas_profiling.schema_name': schema_name,
'extractor.pandas_profiling.database_name': 'postgres',
'extractor.pandas_profiling.cluster_name': 'dev',
'extractor.pandas_profiling.file_path': report_file
}

job_config = ConfigFactory.from_dict(dict_config)

task = DefaultTask(extractor=PandasProfilingColumnStatsExtractor(), loader=FsNeo4jCSVLoader())

job = DefaultJob(conf=job_config,
task=task)

job.launch()
```

### [BamboohrUserExtractor](./databuilder/extractor/user/bamboohr/bamboohr_user_extractor.py)

The included `BamboohrUserExtractor` provides support for extracting basic user metadata from [BambooHR](https://www.bamboohr.com/). For companies and organizations that use BambooHR to store employee information such as email addresses, first names, last names, titles, and departments, use the `BamboohrUserExtractor` to populate Amundsen user data.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
import json
from typing import (
Any, Dict, Tuple,
)

import dateutil.parser
from pyhocon import ConfigFactory, ConfigTree

from databuilder.extractor.base_extractor import Extractor
from databuilder.models.table_stats import TableColumnStats


class PandasProfilingColumnStatsExtractor(Extractor):
FILE_PATH = 'file_path'
DATABASE_NAME = 'database_name'
TABLE_NAME = 'table_name'
SCHEMA_NAME = 'schema_name'
CLUSTER_NAME = 'cluster_name'

# if you wish to collect only selected set of metrics configure stat_mappings option of the extractor providing
# similar dictionary but containing only keys of metrics you wish to collect.
# For example - if you want only min and max value of a column, provide extractor with configuration option:
# PandasProfilingColumnStatsExtractor.STAT_MAPPINGS = {'max': ('Maximum', float), 'min': ('Minimum', float)}
STAT_MAPPINGS = 'stat_mappings'

# - key - raw name of the stat in pandas-profiling. Value - tuple of stat spec.
# - first value of the tuple - full name of the stat
# - second value of the tuple - function modifying the stat (by default we just do type casting)
DEFAULT_STAT_MAPPINGS = {
'25%': ('Quantile 25%', float),
'5%': ('Quantile 5%', float),
'50%': ('Quantile 50%', float),
'75%': ('Quantile 75%', float),
'95%': ('Quantile 95%', float),
'chi_squared': ('Chi squared', lambda x: float(x.get('statistic'))),
'count': ('Count', int),
'is_unique': ('Unique', bool),
'kurtosis': ('Kurtosis', float),
'max': ('Maximum', float),
'max_length': ('Maximum length', int),
'mean': ('Mean', float),
'mean_length': ('Mean length', int),
'median_length': ('Median length', int),
'min': ('Minimum', float),
'min_length': ('Minimum length', int),
'monotonic': ('Monotonic', bool),
'n_characters': ('Characters', int),
'n_characters_distinct': ('Distinct characters', int),
'n_distinct': ('Distinct values', int),
'n_infinite': ('Infinite values', int),
'n_missing': ('Missing values', int),
'n_negative': ('Negative values', int),
'n_unique': ('Unique values', int),
'n_zeros': ('Zeros', int),
'p_distinct': ('Distinct values %', lambda x: float(x * 100)),
'p_infinite': ('Infinite values %', lambda x: float(x * 100)),
'p_missing': ('Missing values %', lambda x: float(x * 100)),
'p_negative': ('Negative values %', lambda x: float(x * 100)),
'p_unique': ('Unique values %', lambda x: float(x * 100)),
'p_zeros': ('Zeros %', lambda x: float(x * 100)),
'range': ('Range', float),
'skewness': ('Skewness', float),
'std': ('Std. deviation', float),
'sum': ('Sum', float),
'variance': ('Variance', float)
# Stats available in pandas-profiling but are not collected by default and require custom, conscious config..
# 'block_alias_char_counts': ('',),
# 'block_alias_counts': ('',),
# 'block_alias_values': ('',),
# 'category_alias_char_counts': ('',),
# 'category_alias_counts': ('',),
# 'category_alias_values': ('',),
# 'character_counts': ('',),
# 'cv': ('',),
# 'first_rows': ('',),
# 'hashable': ('',),
# 'histogram': ('',),
# 'histogram_frequencies': ('',),
# 'histogram_length': ('',),
# 'iqr': ('',),
# 'length': ('',),
# 'mad': ('',),
# 'memory_size': ('',),
# 'monotonic_decrease': ('Monotonic decrease', bool),
# 'monotonic_decrease_strict': ('Strict monotonic decrease', bool),
# 'monotonic_increase': ('Monotonic increase', bool),
# 'monotonic_increase_strict': ('Strict monotonic increase', bool),
# 'n': ('',),
# 'n_block_alias': ('',),
# 'n_category': ('Categories', int),
# 'n_scripts': ('',),
# 'ordering': ('',),
# 'script_char_counts': ('',),
# 'script_counts': ('',),
# 'value_counts_index_sorted': ('',),
# 'value_counts_without_nan': ('',),
# 'word_counts': ('',),
# 'type': ('Type', str)
}

PRECISION = 'precision'

DEFAULT_CONFIG = ConfigFactory.from_dict({STAT_MAPPINGS: DEFAULT_STAT_MAPPINGS, PRECISION: 3})

def get_scope(self) -> str:
return 'extractor.pandas_profiling'

def init(self, conf: ConfigTree) -> None:
self.conf = conf.with_fallback(PandasProfilingColumnStatsExtractor.DEFAULT_CONFIG)

self._extract_iter = self._get_extract_iter()

def extract(self) -> Any:
try:
result = next(self._extract_iter)

return result
except StopIteration:
return None

def _get_extract_iter(self) -> Any:
report = self._load_report()

variables = report.get('variables', dict())
report_time = self.parse_date(report.get('analysis', dict()).get('date_start'))

for column_name, column_stats in variables.items():
for _stat_name, stat_value in column_stats.items():
stat_spec = self.stat_mappings.get(_stat_name)

if stat_spec:
stat_name, stat_modifier = stat_spec

if isinstance(stat_value, float):
stat_value = self.round_value(stat_value)

stat = TableColumnStats(table_name=self.table_name, col_name=column_name, stat_name=stat_name,
stat_val=stat_modifier(stat_value), start_epoch=report_time, end_epoch='0',
db=self.database_name, cluster=self.cluster_name, schema=self.schema_name)

yield stat

def _load_report(self) -> Dict[str, Any]:
path = self.conf.get(PandasProfilingColumnStatsExtractor.FILE_PATH)

try:
with open(path, 'r') as f:
_data = f.read()

data = json.loads(_data)

return data
except Exception:
return {}

@staticmethod
def parse_date(string_date: str) -> str:
try:
date_parsed = dateutil.parser.parse(string_date)

# date from pandas-profiling doesn't contain timezone so to be timezone safe we need to assume it's utc
if not date_parsed.tzname():
return PandasProfilingColumnStatsExtractor.parse_date(f'{string_date}+0000')

return str(int(date_parsed.timestamp()))
except Exception:
return '0'

def round_value(self, value: float) -> float:
return round(value, self.conf.get(PandasProfilingColumnStatsExtractor.PRECISION))

@property
def stat_mappings(self) -> Dict[str, Tuple[str, Any]]:
return dict(self.conf.get(PandasProfilingColumnStatsExtractor.STAT_MAPPINGS))

@property
def cluster_name(self) -> str:
return self.conf.get(PandasProfilingColumnStatsExtractor.CLUSTER_NAME)

@property
def database_name(self) -> str:
return self.conf.get(PandasProfilingColumnStatsExtractor.DATABASE_NAME)

@property
def schema_name(self) -> str:
return self.conf.get(PandasProfilingColumnStatsExtractor.SCHEMA_NAME)

@property
def table_name(self) -> str:
return self.conf.get(PandasProfilingColumnStatsExtractor.TABLE_NAME)
Loading

0 comments on commit 6232439

Please sign in to comment.