Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
513b8be
Add Leland's demo notebook
henrydavidge May 15, 2020
1955d38
Merge pull request #3 from henrydavidge/add-nb
henrydavidge May 19, 2020
41d8fba
block_variants_and_samples Transformer to create genotype DataFrame f…
kianfar77 May 19, 2020
27e400e
Merge branch 'master' of https://github.com/projectglow/glow
karenfeng May 20, 2020
dfa6c08
Merge branch 'master' of https://github.com/projectglow/glow
karenfeng May 21, 2020
f5424ee
feat: ridge models for wgr added (#1)
LelandBarnard May 22, 2020
b065560
[HLS-539] Fix compatibility between blocked GT transformer and WGR (#6)
karenfeng May 29, 2020
9778381
Merge branch 'master' of github.com:projectglow/glow
henrydavidge May 29, 2020
35a2383
Merge branch 'master' of https://github.com/projectglow/glow
karenfeng Jun 1, 2020
86fab65
Merge branch 'master' of https://github.com/projectglow/glow
karenfeng Jun 2, 2020
265370f
Simplify ordering logic in levels code (#7)
henrydavidge Jun 2, 2020
1f32506
Limit Spark memory conf in tests (#9)
karenfeng Jun 2, 2020
f6f00d4
Merge branch 'master' of https://github.com/projectglow/glow
karenfeng Jun 3, 2020
cfc08e6
Improve partitioning in block_variants_and_samples transformer (#11)
kianfar77 Jun 5, 2020
f2f30c0
Remove unnecessary header_block grouping (#10)
karenfeng Jun 5, 2020
bcbadd6
Merge branch 'master' of https://github.com/projectglow/glow
karenfeng Jun 5, 2020
5bbad57
Merge branch 'master' of https://github.com/projectglow/glow
karenfeng Jun 8, 2020
1686138
Create sample ID blocking helper functions (#12)
karenfeng Jun 10, 2020
6bfad34
Add type-checking to WGR APIs (#14)
karenfeng Jun 12, 2020
afaa6df
Add covariate support (#13)
LelandBarnard Jun 12, 2020
cd6c6a1
Flatten estimated phenotypes (#15)
karenfeng Jun 15, 2020
d558115
Add fit_transform function to models (#17)
karenfeng Jun 17, 2020
6b4d968
support alpha inference
karenfeng Jun 18, 2020
859d5da
test fixup
karenfeng Jun 18, 2020
e09f509
more test fixup
karenfeng Jun 18, 2020
f35ffa1
test fixups
karenfeng Jun 19, 2020
79e0eea
Merge branch 'master' of https://github.com/projectglow/glow
karenfeng Jun 19, 2020
b03679d
sub-sample
karenfeng Jun 19, 2020
5d61461
test fixup
karenfeng Jun 19, 2020
5052940
address comments - only infer alphas during fit
karenfeng Jun 22, 2020
333e7f9
exception varies
karenfeng Jun 22, 2020
e920d06
Rename levels (#20)
karenfeng Jun 22, 2020
782d1ff
Errors vary by Spark version
karenfeng Jun 22, 2020
939e9bb
Add license headers (#21)
henrydavidge Jun 22, 2020
d3bddc9
Merge branch 'master' of github.com:databricks/glow-wgr into infer-alpha
henrydavidge Jun 22, 2020
fde1099
Merge branch 'infer-alpha2' into infer-alpha-merge
henrydavidge Jun 22, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 35 additions & 2 deletions python/glow/wgr/linear_model/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
from nptyping import Float, Int, NDArray
import numpy as np
import pandas as pd
from pyspark.sql import DataFrame
from typeguard import typechecked
from typing import Any, Iterable, List, Tuple
from typing import Any, Dict, Iterable, List, Tuple


@typechecked
Expand Down Expand Up @@ -212,7 +213,7 @@ def new_headers(header_block: str, alpha_names: Iterable[str],


@typechecked
def r_squared(XB: NDArray[Float], Y: NDArray[Float]):
def r_squared(XB: NDArray[Float], Y: NDArray[Float]) -> NDArray[(Any, ), Float]:
"""
Computes the coefficient of determination (R2) metric between the matrix resulting from X*B and the matrix of labels
Y.
Expand All @@ -228,3 +229,35 @@ def r_squared(XB: NDArray[Float], Y: NDArray[Float]):
tot = np.power(Y - Y.mean(), 2).sum()
res = np.power(Y - XB, 2).sum(axis=0)
return 1 - (res / tot)


@typechecked
def create_alpha_dict(alphas: NDArray[(Any, ), Float]) -> Dict[str, Float]:
"""
Creates a mapping to attach string identifiers to alpha values.

Args:
alphas : Alpha values

Returns:
Dict of [alpha names, alpha values]
"""
return {f'alpha_{i}': a for i, a in enumerate(alphas)}


@typechecked
def generate_alphas(blockdf: DataFrame) -> Dict[str, Float]:
"""
Generates alpha values using a range of heritability values and the number of headers.

Args:
blockdf : Spark DataFrame representing a block matrix

Returns:
Dict of [alpha names, alpha values]
"""
num_headers = blockdf.select('header').distinct().count()
heritability_vals = [0.99, 0.75, 0.50, 0.25, 0.01]
alphas = np.array([num_headers / h for h in heritability_vals])
print(f"Generated alphas: {alphas}")
return create_alpha_dict(alphas)
17 changes: 11 additions & 6 deletions python/glow/wgr/linear_model/ridge_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,16 @@ class RidgeReducer:
block with L columns to begin with will be reduced to a block with K columns, where each column is the prediction
of one ridge model for one target label.
"""
def __init__(self, alphas: NDArray[(Any, ), Float]) -> None:
def __init__(self, alphas: NDArray[(Any, ), Float] = np.array([])) -> None:
"""
RidgeReducer is initialized with a list of alpha values.

Args:
alphas : array_like of alpha values used in the ridge reduction
alphas : array_like of alpha values used in the ridge reduction (optional).
"""
if not (alphas >= 0).all():
raise Exception('Alpha values must all be non-negative.')
self.alphas = {f'alpha_{i}': a for i, a in enumerate(alphas)}
self.alphas = create_alpha_dict(alphas)

def fit(
self,
Expand Down Expand Up @@ -69,6 +69,8 @@ def fit(
if 'label' in blockdf.columns:
map_key_pattern.append('label')
reduce_key_pattern.append('label')
if not self.alphas:
self.alphas = generate_alphas(blockdf)

map_udf = pandas_udf(
lambda key, pdf: map_normal_eqn(key, map_key_pattern, pdf, labeldf, sample_blocks, covdf
Expand Down Expand Up @@ -163,16 +165,16 @@ class RidgeRegression:
coefficients. The optimal ridge alpha value is chosen for each label by maximizing the average out of fold r2
score.
"""
def __init__(self, alphas: NDArray[(Any, ), Float]) -> None:
def __init__(self, alphas: NDArray[(Any, ), Float] = np.array([])) -> None:
"""
RidgeRegression is initialized with a list of alpha values.

Args:
alphas : array_like of alpha values used in the ridge regression
alphas : array_like of alpha values used in the ridge regression (optional).
"""
if not (alphas >= 0).all():
raise Exception('Alpha values must all be non-negative.')
self.alphas = {f'alpha_{i}': a for i, a in enumerate(alphas)}
self.alphas = create_alpha_dict(alphas)

def fit(
self,
Expand Down Expand Up @@ -201,6 +203,9 @@ def fit(
map_key_pattern = ['sample_block', 'label']
reduce_key_pattern = ['header_block', 'header', 'label']

if not self.alphas:
self.alphas = generate_alphas(blockdf)

map_udf = pandas_udf(
lambda key, pdf: map_normal_eqn(key, map_key_pattern, pdf, labeldf, sample_blocks, covdf
), normal_eqn_struct, PandasUDFType.GROUPED_MAP)
Expand Down
16 changes: 16 additions & 0 deletions python/glow/wgr/linear_model/tests/test_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from glow.wgr.linear_model.functions import *
import numpy as np
import pandas as pd
from pyspark.sql import Row
import pytest


Expand Down Expand Up @@ -58,3 +59,18 @@ def test_assemble_block_zero_sig():
df = pd.DataFrame({'mu': [0.2, 0], 'sig': [0.1, 0], 'values': [[0.1, 0.3], [0, 0]]})
with pytest.raises(ValueError):
assemble_block(n_rows=2, n_cols=2, pdf=df, cov_matrix=np.array([[]]))


def test_generate_alphas(spark):
df = spark.createDataFrame(
[Row(header='header_one'),
Row(header='header_one'),
Row(header='header_two')])
expected_alphas = {
'alpha_0': np.float(2 / 0.99),
'alpha_1': np.float(2 / 0.75),
'alpha_2': np.float(2 / 0.5),
'alpha_3': np.float(2 / 0.25),
'alpha_4': np.float(2 / 0.01)
}
assert generate_alphas(df) == expected_alphas
95 changes: 87 additions & 8 deletions python/glow/wgr/linear_model/tests/test_ridge_regression.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

from glow.wgr.linear_model import RidgeReducer, RidgeRegression
from glow.wgr.linear_model.ridge_model import *
from glow.wgr.linear_model.functions import generate_alphas
import pytest

data_root = 'test-data/wgr/ridge-regression'

Expand Down Expand Up @@ -47,6 +49,11 @@ def __get_sample_blocks(indexdf):
}


def __assert_dataframes_equal(df1, df2):
assert df1.subtract(df2).count() == 0
assert df2.subtract(df1).count() == 0


def test_map_normal_eqn(spark):

indexdf = spark.read.parquet(f'{data_root}/groupedIDs.snappy.parquet')
Expand Down Expand Up @@ -493,7 +500,7 @@ def test_two_level_regression_with_cov(spark):
def test_tie_break(spark):

indexdf = spark.read.parquet(f'{data_root}/groupedIDs.snappy.parquet')
blockdf = spark.read.parquet(f'{data_root}/blockedGT.snappy.parquet')
blockdf = spark.read.parquet(f'{data_root}/blockedGT.snappy.parquet').limit(5)

group2ids = __get_sample_blocks(indexdf)

Expand All @@ -510,7 +517,7 @@ def test_tie_break(spark):
def test_reducer_fit_transform(spark):

indexdf = spark.read.parquet(f'{data_root}/groupedIDs.snappy.parquet')
blockdf = spark.read.parquet(f'{data_root}/blockedGT.snappy.parquet')
blockdf = spark.read.parquet(f'{data_root}/blockedGT.snappy.parquet').limit(5)

group2ids = __get_sample_blocks(indexdf)

Expand All @@ -519,24 +526,96 @@ def test_reducer_fit_transform(spark):
level1df = stack0.transform(blockdf, labeldf, group2ids, model0df)
fit_transform_df = stack0.fit_transform(blockdf, labeldf, group2ids)

assert fit_transform_df.subtract(level1df).count() == 0
assert level1df.subtract(fit_transform_df).count() == 0
__assert_dataframes_equal(fit_transform_df, level1df)


def test_regression_fit_transform(spark):

indexdf = spark.read.parquet(f'{data_root}/groupedIDs.snappy.parquet')
blockdf = spark.read.parquet(f'{data_root}/blockedGT.snappy.parquet')
blockdf = spark.read.parquet(f'{data_root}/blockedGT.snappy.parquet').limit(5)

group2ids = __get_sample_blocks(indexdf)

stack0 = RidgeReducer(alphas)
model0df = stack0.fit(blockdf, labeldf, group2ids)
level1df = stack0.transform(blockdf, labeldf, group2ids, model0df)
level1df = stack0.fit_transform(blockdf, labeldf, group2ids)

regressor = RidgeRegression(alphas)
model1df, cvdf = regressor.fit(level1df, labeldf, group2ids)
yhatdf = regressor.transform(level1df, labeldf, group2ids, model1df, cvdf)
fit_transform_df = regressor.fit_transform(level1df, labeldf, group2ids)

assert fit_transform_df.equals(yhatdf)


def test_reducer_generate_alphas(spark):

indexdf = spark.read.parquet(f'{data_root}/groupedIDs.snappy.parquet')
blockdf = spark.read.parquet(f'{data_root}/blockedGT.snappy.parquet').limit(5)

group2ids = __get_sample_blocks(indexdf)

stack_without_alphas = RidgeReducer()
stack_with_alphas = RidgeReducer(np.array(sorted(list(generate_alphas(blockdf).values()))))

model0_without_alphas = stack_without_alphas.fit(blockdf, labeldf, group2ids)
model0df = stack_with_alphas.fit(blockdf, labeldf, group2ids)
__assert_dataframes_equal(model0_without_alphas, model0df)

level1_without_alphas = stack_without_alphas.transform(blockdf, labeldf, group2ids, model0df)
level1df = stack_with_alphas.transform(blockdf, labeldf, group2ids, model0df)
__assert_dataframes_equal(level1_without_alphas, level1df)


def test_regression_generate_alphas(spark):

indexdf = spark.read.parquet(f'{data_root}/groupedIDs.snappy.parquet')
blockdf = spark.read.parquet(f'{data_root}/blockedGT.snappy.parquet').limit(5)

group2ids = __get_sample_blocks(indexdf)
stack0 = RidgeReducer(alphas)
level1df = stack0.fit_transform(blockdf, labeldf, group2ids)

regressor_without_alphas = RidgeRegression()
regressor_with_alphas = RidgeRegression(
np.array(sorted(list(generate_alphas(level1df).values()))))

model1_without_alphas, cv_without_alphas = regressor_without_alphas.fit(
level1df, labeldf, group2ids)
model1df, cvdf = regressor_with_alphas.fit(level1df, labeldf, group2ids)
__assert_dataframes_equal(model1_without_alphas, model1df)
__assert_dataframes_equal(cv_without_alphas, cvdf)

yhat_without_alphas = regressor_without_alphas.transform(level1df, labeldf, group2ids, model1df,
cvdf)
yhatdf = regressor_with_alphas.transform(level1df, labeldf, group2ids, model1df, cvdf)
assert yhat_without_alphas.equals(yhatdf)


def test_reducer_missing_alphas(spark):
indexdf = spark.read.parquet(f'{data_root}/groupedIDs.snappy.parquet')
blockdf = spark.read.parquet(f'{data_root}/blockedGT.snappy.parquet').limit(5)

group2ids = __get_sample_blocks(indexdf)
stack_fit = RidgeReducer()
stack_transform = RidgeReducer()

model0df = stack_fit.fit(blockdf, labeldf, group2ids)
level1df = stack_transform.transform(blockdf, labeldf, group2ids, model0df)
with pytest.raises(Exception):
level1df.collect()


def test_regression_generate_alphas(spark):

indexdf = spark.read.parquet(f'{data_root}/groupedIDs.snappy.parquet')
blockdf = spark.read.parquet(f'{data_root}/blockedGT.snappy.parquet').limit(5)

group2ids = __get_sample_blocks(indexdf)
stack0 = RidgeReducer(alphas)
level1df = stack0.fit_transform(blockdf, labeldf, group2ids)

regressor_fit = RidgeRegression()
regressor_transform = RidgeRegression()

model1df, cvdf = regressor_fit.fit(level1df, labeldf, group2ids)
with pytest.raises(Exception):
y_hat = regressor_transform.transform(level1df, labeldf, group2ids, model1df, cvdf)