From cca7171a8d62dde2edd4e28d35115179256fdce0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?arthur=20b=C3=B6=C3=B6k?= Date: Sat, 29 Jul 2023 15:59:00 -0700 Subject: [PATCH] conditional probas for symptoms --- dr_claude/mcts/base.py | 144 ++++++++++++++++++++------------ tests/test_conditional_proba.py | 49 ++++++++++- 2 files changed, 136 insertions(+), 57 deletions(-) diff --git a/dr_claude/mcts/base.py b/dr_claude/mcts/base.py index b645dbd..c83e13b 100644 --- a/dr_claude/mcts/base.py +++ b/dr_claude/mcts/base.py @@ -1,83 +1,121 @@ -from typing import Collection, Dict +from typing import Annotated, Collection, Dict, TypeVar import numpy as np from dr_claude import datamodels -def compute_condition_probabilities( +T = TypeVar("T") + + +class VectorTransformer: + @staticmethod + def to_dictionary(vector: np.ndarray, index: Dict[T, int]) -> Dict[T, float]: + return {item: vector[i] for item, i in index.items()} + + +def compute_symptom_posterior_flat_prior_dict( + matrix: datamodels.ProbabilityMatrix, + condition_probas: Annotated[np.ndarray, "p(condition)"], +) -> Annotated[ + Dict[datamodels.Symptom, float], + "p(symptom) | (pertinent_positives, pertinent_negatives)", +]: + """ + Compute the probability of each symptom given a set of condition probabiltiiies + returns a vector of size m with p(symptom) | conditional_probas where m is the number of conditions + ASSUMES THAT ALL CONDITIONS ARE EQUALLY LIKELY (p(c) = 1 / n) + """ + + symptom_probas = compute_symptom_posterior_flat_prior(matrix, condition_probas) + return VectorTransformer.to_dictionary(symptom_probas, matrix.rows) + + +def compute_symptom_posterior_flat_prior( + matrix: datamodels.ProbabilityMatrix, + condition_probas: Annotated[np.ndarray, "p(condition)"], +) -> Annotated[np.ndarray, "p(symptom) | (pertinent_positives, pertinent_negatives)"]: + """ + Compute the probability of each symptom given a set of condition probabiltiiies + returns a vector of size m with p(symptom) | conditional_probas where m is the number of conditions + ASSUMES THAT ALL CONDITIONS ARE EQUALLY LIKELY (p(c) = 1 / n) + """ + return np.sum(matrix.matrix * condition_probas, axis=1) + + +def compute_condition_posterior_flat_prior_dict( matrix: datamodels.ProbabilityMatrix, pertinent_positives: Collection[datamodels.Symptom], pertinent_negatives: Collection[datamodels.Symptom], -) -> Dict[datamodels.Condition, float]: +) -> Annotated[ + Dict[datamodels.Condition, float], + "p(condition) | (pertinent_positives, pertinent_negatives)", +]: + condition_probas = compute_condition_posterior_flat_prior( + matrix, + pertinent_positives, + pertinent_negatives, + ) + return VectorTransformer.to_dictionary(condition_probas, matrix.columns) + + +def compute_condition_posterior_flat_prior( + matrix: datamodels.ProbabilityMatrix, + pertinent_positives: Collection[datamodels.Symptom], + pertinent_negatives: Collection[datamodels.Symptom], +) -> Annotated[np.ndarray, "p(condition) | (pertinent_positives, pertinent_negatives)"]: """ Compute the probability of each condition given the pertinent positives and pertinent negatives - :param matrix: The probability matrix - :param pertinent_positives: The pertinent positives - :param pertinent_negatives: The pertinent negatives - :return: A dictionary of condition probabilities + returns a vector of size n with p(condition) | (pertinent_positives, pertinent_negatives) where n is the number of conditions + ASSUMES THAT ALL CONDITIONS ARE EQUALLY LIKELY (p(c) = 1 / n) """ - conditional_log_probas = np.zeros(matrix.matrix.shape[1]) - # Compute the log probability of each condition given the pertinent positives and pertinent negatives + conditional_log_probas = np.zeros(matrix.matrix.shape[1]) for symptom in pertinent_positives: conditional_log_probas += np.log(matrix[symptom, :]) for symptom in pertinent_negatives: - conditional_log_probas -= np.log(matrix[symptom, :]) + conditional_log_probas += np.log(1 - matrix[symptom, :]) # Normalize - conditional_proba_energy = np.exp(conditional_log_probas) - conditional_probas = conditional_proba_energy / np.sum(conditional_proba_energy) - - return {condition: conditional_probas[i] for condition, i in matrix.columns.items()} + conditional_proba = np.exp(conditional_log_probas) + conditional_proba /= np.sum(conditional_proba) + return conditional_proba if __name__ == "__main__": - ... + condition_1 = datamodels.Condition(name="COVID-19", umls_code="C0000001") + condition_1_differential_symptom = datamodels.WeightedSymptom( + name="Cough", + umls_code="C0000001", + weight=0.5, + ) + + condition_2 = datamodels.Condition(name="Common Cold", umls_code="C0000004") + condition_2_differential_symptom = datamodels.WeightedSymptom( + name="Runny nose", + umls_code="C0000003", + weight=0.5, + ) + + common_symptom = datamodels.WeightedSymptom( + name="Fever", umls_code="C0000002", weight=0.5 + ) + db = datamodels.DiseaseSymptomKnowledgeBase( condition_symptoms={ - datamodels.Condition(name="COVID-19", umls_code="C0000001"): [ - datamodels.WeightedSymptom( - name="Fever", - umls_code="C0000002", - weight=0.5, - noise_rate=0.2, - ), - datamodels.WeightedSymptom( - name="Cough", - umls_code="C0000003", - weight=0.5, - noise_rate=0.1, - ), - ], - datamodels.Condition(name="Common Cold", umls_code="C0000004"): [ - datamodels.WeightedSymptom( - name="Fever", - umls_code="C0000002", - weight=0.5, - noise_rate=0.05, - ), - datamodels.WeightedSymptom( - name="Runny nose", - umls_code="C0000004", - weight=0.5, - noise_rate=0.01, - ), - ], + condition_1: [common_symptom, condition_1_differential_symptom], + condition_2: [common_symptom, condition_2_differential_symptom], } ) matrix = datamodels.DiseaseSymptomKnowledgeBaseTransformer.to_numpy(db) - conditional_probas = compute_condition_probabilities( + probas = compute_condition_posterior_flat_prior( matrix, - pertinent_positives=[ - datamodels.Symptom(name="Fever", umls_code="C0000002"), - datamodels.Symptom(name="Cough", umls_code="C0000003"), - ], - pertinent_negatives=[ - datamodels.Symptom(name="Runny nose", umls_code="C0000004"), - ], + [condition_1_differential_symptom, common_symptom], + [condition_2_differential_symptom], + ) + symptom_probas = compute_symptom_posterior_flat_prior( + matrix, + probas, ) - - print(conditional_probas) diff --git a/tests/test_conditional_proba.py b/tests/test_conditional_proba.py index f60312d..cf138e8 100644 --- a/tests/test_conditional_proba.py +++ b/tests/test_conditional_proba.py @@ -2,11 +2,11 @@ from dr_claude.mcts import base -def test_conditional_proba(): +def test_conditional_condition_proba(): condition_1 = datamodels.Condition(name="COVID-19", umls_code="C0000001") condition_1_differential_symptom = datamodels.WeightedSymptom( - name="Fever", - umls_code="C0000002", + name="Cough", + umls_code="C0000001", weight=0.5, ) @@ -28,10 +28,51 @@ def test_conditional_proba(): } ) matrix = datamodels.DiseaseSymptomKnowledgeBaseTransformer.to_numpy(db) - conditional_probas = base.compute_condition_probabilities( + conditional_probas = base.compute_condition_posterior_flat_prior_dict( matrix, pertinent_positives=[common_symptom, condition_1_differential_symptom], pertinent_negatives=[condition_2_differential_symptom], ) assert conditional_probas[condition_1] > conditional_probas[condition_2] + + +def test_conditional_proba_symptom(): + condition_1 = datamodels.Condition(name="COVID-19", umls_code="C0000001") + condition_1_differential_symptom = datamodels.WeightedSymptom( + name="Cough", + umls_code="C0000001", + weight=0.5, + ) + + condition_2 = datamodels.Condition(name="Common Cold", umls_code="C0000004") + condition_2_differential_symptom = datamodels.WeightedSymptom( + name="Runny nose", + umls_code="C0000004", + weight=0.5, + ) + + common_symptom = datamodels.WeightedSymptom( + name="Fever", umls_code="C0000002", weight=0.5 + ) + + db = datamodels.DiseaseSymptomKnowledgeBase( + condition_symptoms={ + condition_1: [common_symptom, condition_1_differential_symptom], + condition_2: [common_symptom, condition_2_differential_symptom], + } + ) + matrix = datamodels.DiseaseSymptomKnowledgeBaseTransformer.to_numpy(db) + condition_probas = base.compute_condition_posterior_flat_prior( + matrix, + pertinent_positives=[common_symptom, condition_1_differential_symptom], + pertinent_negatives=[condition_2_differential_symptom], + ) + symptom_probas = base.compute_symptom_posterior_flat_prior_dict( + matrix, condition_probas + ) + + assert ( + symptom_probas[condition_1_differential_symptom] + > symptom_probas[condition_2_differential_symptom] + )