From f46c7b7a7ece3e1cc0423f0e08bf3b037749280a Mon Sep 17 00:00:00 2001 From: Bryan Larson Date: Thu, 29 Oct 2020 16:56:17 -0400 Subject: [PATCH] feat(ppm/fhir/auth): PPM-532 - RANT Questionnaire handling improvements; added authorization support for apps; minor tweaks and fixes --- ppmutils/auth.py | 53 +++++++ ppmutils/fhir.py | 372 ++++++++++++++++++++++++++++++++++++++--------- ppmutils/ppm.py | 13 +- 3 files changed, 365 insertions(+), 73 deletions(-) create mode 100644 ppmutils/auth.py diff --git a/ppmutils/auth.py b/ppmutils/auth.py new file mode 100644 index 0000000..e58262d --- /dev/null +++ b/ppmutils/auth.py @@ -0,0 +1,53 @@ +from ppmutils.ppm import PPM + +import logging + +logger = logging.getLogger(__name__) + + +class Auth(object): + + ITEM = "ppm" + ADMIN = "admin" + VIEW = "view" + PERMISSIONS = { + "HEAD": [ADMIN, VIEW], + "OPTION": [ADMIN, VIEW], + "GET": [ADMIN, VIEW], + "POST": [ADMIN], + "PATCH": [ADMIN], + "PUT": [ADMIN], + "DELETE": [ADMIN], + } + + @classmethod + def has_permission(cls, method, permissions, study=None): + """ + Inspects the set of permissions and returns True if permissions + contain admin level permissions. If a study is passed, this method + returns True if permissions are admin on PPM or on study. + + :param method: The requested method to check permissions for + :type method: str + :param permissions: A list of permissions from DBMI-AuthZ + :type permissions: list + :param study: A specific study, defaults to None + :type study: str, optional + """ + + # Map permissions + map = {a["item"].lower(): a["permission"].lower() for a in permissions} + + # Check for site level permissions first + if map.get(cls.ITEM) in cls.PERMISSIONS[method]: + return cls.ITEM, map[cls.ITEM] + + # Check study, if passed + if study: + + # Set the item string + item = f"{cls.ITEM}.{PPM.Study.get(study).value}" + if map.get(item) in cls.PERMISSIONS[method]: + return item, map[item] + + return None, None diff --git a/ppmutils/fhir.py b/ppmutils/fhir.py index 07a6e30..65bbec6 100644 --- a/ppmutils/fhir.py +++ b/ppmutils/fhir.py @@ -6,6 +6,7 @@ import random import re import base64 +import string from dateutil.parser import parse from dateutil.tz import tz from datetime import datetime, date @@ -1375,8 +1376,6 @@ def _query_bundle(resource_type, query=None): :return: A Bundle of FHIR resources :rtype: Bundle """ - logger.debug("Query resource: {} : {}".format(resource_type, query)) - # Build the URL. url_builder = furl(PPM.fhir_url()) url_builder.path.add(resource_type) @@ -2204,10 +2203,81 @@ def query_enrollment_status(email): return None @staticmethod - def query_ppm_research_studies(email, flatten_return=True): + def query_ppm_participant_details(patient): + """ + Fetches and returns basic details on the Patient and any PPM studies + they're participating in. + + :param fhir_id: Patient identifier + :type fhir_id: str + :return: A tuple of Patient object and a list of study codes + :rtype: dict, list + """ + # Get flags for current user + query = { + "identifier": "{}|".format(FHIR.research_subject_identifier_system), + "_include": "ResearchSubject:individual", + } + + # Update for the patient query + query.update(FHIR._patient_resource_query(patient, "individual")) + + # Get the resources + bundle = FHIR._query_bundle("ResearchSubject", query=query) + + # Get study IDs + research_study_ids = [ + s.resource.study.reference.split("/")[1] + for s in bundle.entry + if s.resource.resource_type == "ResearchSubject" + ] + + # Remove 'ppm-' prefix and return + return FHIR.flatten_patient(bundle.as_json()), [ + research_study_id.replace("ppm-", "") for research_study_id in research_study_ids + ] + + @staticmethod + def query_ppm_research_study_codes(patient): + """ + Fetches all PPM-managed ResearchStudy resources the passed patient + is participating and returns the study codes. + + :param patient: Patient identifier (email, PPM ID, Patient object), defaults to None + :type patient: str + :return: A list of ResearchStudy codes + :rtype: list + """ # Find Research subjects (without identifiers, so as to exclude PPM resources) - research_subjects = FHIR.query_ppm_research_subjects(email, flatten_return=False) + research_subjects = FHIR.query_ppm_research_subjects(patient, flatten_return=False) + + if not research_subjects: + logger.debug("No Research Subjects, no Research Studies") + return None + + # Get study IDs + research_study_ids = [subject["study"]["reference"].split("/")[1] for subject in research_subjects] + + # Remove 'ppm-' prefix and return + return [research_study_id.replace("ppm-", "") for research_study_id in research_study_ids] + + @staticmethod + def query_ppm_research_studies(patient=None, flatten_return=True): + """ + Fetches all PPM-managed ResearchStudy resources. If given, the results + will be limited to those the passed patient is currently participating in + + :param patient: Patient identifier (email, PPM ID, Patient object), defaults to None + :type patient: str, optional + :param flatten_return: Flatten FHIR resource, defaults to True + :type flatten_return: bool, optional + :return: A list of ResearchStudy resources, in FHIR format or simplified depending on flatten_return argument + :rtype: list + """ + + # Find Research subjects (without identifiers, so as to exclude PPM resources) + research_subjects = FHIR.query_ppm_research_subjects(patient, flatten_return=False) if not research_subjects: logger.debug("No Research Subjects, no Research Studies") @@ -3568,7 +3638,7 @@ def delete_consent(patient_id, project): ) # Check project - if project == "autism": + if PPM.Project.get(project) is PPM.Study.ASD: questionnaire_ids = [ "ppm-asd-consent-guardian-quiz", @@ -4308,11 +4378,21 @@ def flatten_questionnaire_response(bundle_dict, questionnaire_id): key=lambda q: int(q[0].split("-")[1]), ) ) + + # Determine index + indices = FHIR.get_answer_indices(questions) for linkId, question in top_questions.items(): # Check for the answer answer = answers.get(linkId) if not answer: + + # Check if dependent and enabled + if FHIR.question_is_conditionally_enabled( + questionnaire, linkId + ) and not FHIR.questionnaire_response_is_enabled(questionnaire, questionnaire_response, linkId): + continue + answer = [mark_safe('N/A')] # Check if dependent and was enabled (or should have an answer but doesn't) @@ -4327,7 +4407,7 @@ def flatten_questionnaire_response(bundle_dict, questionnaire_id): ) # Format the question text - text = "{}. {}".format(len(response.keys()) + 1, question) + text = "{} {}".format(indices.get(linkId), question) # Add the answer response[text] = answer @@ -4342,6 +4422,67 @@ def flatten_questionnaire_response(bundle_dict, questionnaire_id): "responses": response, } + @staticmethod + def get_answer_indices(questions): + """ + Returns a mapping of a QuestionnaireItem linkId to the string to be + used as its indexing in the listing of questions and answers in a + view. This method relies on sequential linkIds being used and also + manages child-questions indexing using letters, etc. + + Top-level questions: {item type}-3 -> "3. Some answer" + Second-level questions: {item-type}-2-4 -> "2. d. Some answer" + Third-level questions: {item-type}-10-1-3 -> "10. a. iii. Some answer" + + :param questions: The dictionary of linkIds to question text + :type questions: dict + :return: The mapping of linkId to answer index text. + with linkId + :rtype: list + """ + + # Create a mapping of linkId to index + indices = {} + offset = 0 + + # Iterate questions + for linkId, question in questions.items(): + + # Parse it up + r = re.compile(r"-?([\d]+)") + parts = r.findall(linkId) + + if len(parts) >= 1: + + # Check for an offset (NEER starts at 5) + if not indices and int(parts[0]) > 1: + offset = 1 - int(parts[0]) + elif not indices and int(parts[0]) == 0: + offset = 1 + + # Set it + index = int(parts[0]) + offset + indices[linkId] = f"{index}. " + + if len(parts) >= 2: + + letter_index = int(parts[1]) + letter = string.ascii_lowercase[letter_index - 1] + indices[linkId] = f"{indices[linkId]}{letter}. " + + if len(parts) == 3: + + # Ensure we have siblings + if len([l for l in questions.keys() if linkId.rsplit(parts[2], 1)[0] in l]) == 1: + continue + + i_count = int(parts[2]) + indices[linkId] = f'{indices[linkId]}{"i" * i_count}. ' + + # Check for an offset + + return indices + @staticmethod def find_questionnaire_item(questionnaire_items, linkId): """Returns the questionnaire item object for the given linkId @@ -4421,10 +4562,80 @@ def get_questionnaire_response_item_answer(questionnaire_response_items, linkId) return answers if len(answers) > 1 else next(iter(answers), None) @staticmethod - def questionnaire_response_is_required(questionnaire, questionnaire_response, linkId, parent=None): + def get_question_path(questionnaire, linkId, parent=None): + """ + Returns a list of QuestionnaireItem objects that form the lineage + of parents of the requested item by linkId. The first item in the list + is at the top-most level followed by second level, and so on until + the requested item, at whatever level it exists. + + :param questionnaire: The questionnaire being processed + :type questionnaire: Questionnaire + :param linkId: The link ID of the question to check + :type linkId: str + :return: The list of QuestionnaireItems forming a path to the question + with linkId + :rtype: list + """ + # Build items + items = [parent] if parent else [] + + # If not parent, start with root of Questionnaire + if not parent: + parent = questionnaire + + # Iterate items and break as soon as we find the correct node or path + for item in parent.item: + + # Compare + if item.linkId == linkId: + + # Check what to return + items.append(item) + break + + # Check for subitems + subitems = item.item and FHIR.get_question_path(questionnaire, linkId, parent=item) + if subitems: + items.extend(subitems) + break + + else: + # If not found, return empty list + return [] + + # Return items + return items + + @staticmethod + def question_is_conditionally_enabled(questionnaire, linkId): + """ + Returns whether the passed question is conditionally enabled or not. + Conditionally enabled means this QuestionnaireItem or one of its + antecedents has an enableWhen property that determines when and if + it and its descendents are enabled. + + :param questionnaire: The questionnaire being processed + :type questionnaire: Questionnaire + :param linkId: The link ID of the question to check + :type linkId: str + :return: Whether it is conditionally enabled or not + :rtype: boolean + """ + # Find first condition + for item in FHIR.get_question_path(questionnaire, linkId): + + # Check this and parent for enableWhen + if item.enableWhen: + return True + + return False + + @staticmethod + def questionnaire_response_is_enabled(questionnaire, questionnaire_response, linkId, parent=None): """ Inspects the Questionnaire for the given link ID and returns whether it is - conditionally required or not based on responses given. + conditionally enabled or not based on responses given. Args: questionnaire (Questionnaire): The Questionnaire object containing being responded to @@ -4436,78 +4647,91 @@ def questionnaire_response_is_required(questionnaire, questionnaire_response, li parent (QuestionnaireItem): The parent questionnaire item for recursive searches Returns: - [bool]: Returns True if this item is required and was enabled else False + [bool]: Returns True if this item is enabled else False """ - # If not parent, start with root of Questionnaire - if not parent: - parent = questionnaire - - # Find it first - for item in parent.item: + # Compile list of conditions + enable_whens = [] + + # Find first condition + for item in FHIR.get_question_path(questionnaire, linkId): + + # Check this and parent for enableWhen + if item.enableWhen: + enable_whens.extend(item.enableWhen) + + # Iterate conditions and check for failures + for enable_when in enable_whens: + + # Get their answer as a list, if not already + answer = FHIR.get_questionnaire_response_item_answer(questionnaire_response.item, enable_when.question) + if type(answer) is not list: + answer = [answer] + + # Check operation (this is not available on FHIR R3 and below) + enable_when_operation = getattr(enable_when, "operation", "=") + + # Check equality of condition and answer + if enable_when_operation == "=": + + # Check for answer type and check if it's in their list of items + if enable_when.answerString is not None: + if enable_when.answerString not in answer: + return False + elif enable_when.answerBoolean is not None: + if enable_when.answerBoolean not in answer: + return False + elif enable_when.answerDate is not None: + if enable_when.answerDate.isostring not in answer: + return False + elif enable_when.answerDateTime is not None: + if enable_when.answerDateTime.isostring not in answer: + return False + elif enable_when.answerInteger is not None: + if enable_when.answerInteger not in answer: + return False + else: + logger.error(f"PPM/FHIR: Unhandled enableWhen answer type: {enable_when.as_json()}") + return False - # Compare - if item.linkId == linkId: + else: + logger.error(f"PPM/FHIR: Unhandled enableWhen operation type: {enable_when.as_json()}") + return False - # If not required, return right away - if not getattr(item, "required", False): - return False + # If we are here, this item is enabled as dependencies are satisfied (if any) + return True - # Check this and parent for enableWhen - if item.enableWhen or getattr(parent, "enableWhen", False): + @staticmethod + def questionnaire_response_is_required(questionnaire, questionnaire_response, linkId, parent=None): + """ + Inspects the Questionnaire for the given link ID and returns whether it is + conditionally required or not based on responses given. - # Compile list of conditions on this item as well as parent item - enable_whens = item.enableWhen if item.enableWhen else [] + getattr(parent, "enableWhen", []) + Args: + questionnaire (Questionnaire): The Questionnaire object containing being responded to + questionnaire_responses (QuestionnaireResponse): The QuestionnaireResponse object containing + all responses + linkId (str): The linkId of the item we are looking for - # Iterate conditions and check for failures - for enable_when in enable_whens: + Keyword Args: + parent (QuestionnaireItem): The parent questionnaire item for recursive searches - # Get their answer as a list, if not already - answer = FHIR.get_questionnaire_response_item_answer( - questionnaire_response.item, enable_when.question - ) - if type(answer) is not list: - answer = [answer] - - # Check operation (this is not available on FHIR R3 and below) - enable_when_operation = getattr(enable_when, "operation", "=") - - # Check equality of condition and answer - if enable_when_operation == "=": - - # Check for answer type and check if it's in their list of items - if enable_when.answerString is not None: - if enable_when.answerString not in answer: - return False - elif enable_when.answerBoolean is not None: - if enable_when.answerBoolean not in answer: - return False - elif enable_when.answerDate is not None: - if enable_when.answerDate.isostring not in answer: - return False - elif enable_when.answerDateTime is not None: - if enable_when.answerDateTime.isostring not in answer: - return False - elif enable_when.answerInteger is not None: - if enable_when.answerInteger not in answer: - return False - else: - logger.error(f"PPM/FHIR: Unhandled enableWhen answer type: {enable_when.as_json()}") - return False + Returns: + [bool]: Returns True if this item is required and was enabled else False + """ - else: - logger.error(f"PPM/FHIR: Unhandled enableWhen operation type: {enable_when.as_json()}") - return False + # Get the question + item = FHIR.find_questionnaire_item(questionnaire.item, linkId) - # If we are here, this item is required and all dependencies are satisfied (if any) - return True + # If not required, return right away + if not getattr(item, "required", False): + return False - # Check for subitems - if item.item and FHIR.questionnaire_response_is_required( - questionnaire, questionnaire_response, linkId, parent=item - ): - return True + # Check this and parent for enableWhen + if not FHIR.questionnaire_response_is_enabled(questionnaire, questionnaire_response, linkId): + return False - return False + # If we are here, this item is required and all dependencies are satisfied (if any) + return True @staticmethod def _questions(items): @@ -5406,12 +5630,18 @@ def ppm_research_study(project, title): } # Hard code dates - if "neer" in project: + if PPM.Study.NEER.value in project: data["period"] = {"start": "2018-05-01T00:00:00Z"} - elif "autism" in project: + elif PPM.Study.ASD.value in project: data["period"] = {"start": "2017-07-01T00:00:00Z"} + elif PPM.Study.RANT.value in project: + data["period"] = {"start": "2020-11-01T00:00:00Z"} + + elif PPM.Study.EXAMPLE.value in project: + data["period"] = {"start": "2020-01-01T00:00:00Z"} + return data @staticmethod diff --git a/ppmutils/ppm.py b/ppmutils/ppm.py index 1433fac..838d7ef 100644 --- a/ppmutils/ppm.py +++ b/ppmutils/ppm.py @@ -14,6 +14,14 @@ logger = logging.getLogger(__name__) +# Adding this decorator to an Enum allows it to +# be passed into a Django context and its members +# accessed via name and value as well as being iterated (untested) +def django_enum(cls): + cls.do_not_call_in_templates = True + return cls + + class PPMEnum(Enum): """ An extended Enum class with some convenience methods for working with @@ -132,9 +140,10 @@ def is_tester(email): return False + @django_enum class Study(PPMEnum): NEER = "neer" - ASD = "autism" + ASD = "asd" EXAMPLE = "example" RANT = "rant" @@ -239,7 +248,7 @@ def choices(cls): """ return ( (PPM.Study.NEER.value, "NEER"), - (PPM.Study.ASD.value, "Autism"), + (PPM.Study.ASD.value, "ASD"), (PPM.Study.EXAMPLE.value, "Example"), (PPM.Study.RANT.value, "RANT"), )