Skip to content

Commit

Permalink
Merge pull request #13 from CanDIG/daisieh/test
Browse files Browse the repository at this point in the history
DIG-1045: First pass at MoH model
  • Loading branch information
daisieh authored Jun 14, 2023
2 parents 12f24e1 + bd7fb3e commit bbd19fc
Show file tree
Hide file tree
Showing 12 changed files with 1,048 additions and 232 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ __pycache__/*
*.json
*.xlsx
*.xls
*.csv
.DS_Store
441 changes: 321 additions & 120 deletions CSVConvert.py

Large diffs are not rendered by default.

88 changes: 74 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,38 +1,98 @@
# clinical_ETL_code

This repository converts MoH clinical data into the mcodepacket format needed for katsu. The cohort-specific mappings are implemented in a private GitHub repository, not here.
This repository converts input csv files with clinical (phenotypic) data into a json aligned with a provided openapi schema. You can provide custom mapping functions to transform data in your input file before writing to the json.

Specifically, this code was designed to convert clinical data for the MOHCCN project into the packet format needed for ingest into CanDIG's clinical data service (katsu).

## Set-up & Installation
Prerequisites:
Prerequisites:
- [Python 3.6+](https://www.python.org/)
- [pip](https://github.com/pypa/pip/)

You'll need to set up a free [account](https://bioportal.bioontology.org/account) at NCBI Bioportal to obtain an API key.

## Converting csvs to mcodepackets
## Running from the command line

Most of the heavy lifting is done in the CSVConvert.py script. See sections below for setting up the inputs. This script:
* reads an file (.xlsx or .csv) or a directory of files (csv)
* reads a template file that contains a list of fields and (if needed) a mapping function
* for each field for each patient, applies the mapping function to transform the raw data into valid model data
* exports the data into a json file(s) appropriate for ingest

```
$ python CSVConvert.py [-h] [--input INPUT] [--mapping|manifest MAPPING]
$ python CSVConvert.py [-h] [--input INPUT] [--manifest manifest_file]
--input: path to dataset to be converted to mCODE data model
--input: path to dataset to be converted to data model
--manifest: Path to a manifest file with settings for the ETL
```

The output packets (`INPUT_map.json` and `INPUT_indexed.json`) will be in the parent of the `INPUT` directory / file.

## Input file format

The input for CSVConvert is either a single xlsx file, a single csv, or a directory of csvs. If providing a spreadsheet, there can be multiple sheets (usually one for each sub-schema).

All rows must contain identifiers that allow linkage to the containing schema, for example, a row that describes a Treatment must have a link to the Donor / Patient id for that Treatment.

Data should be (tidy)[https://r4ds.had.co.nz/tidy-data.html], with each variable in a separate column, each row representing an observation, and a single data entry in each cell.

Depending on the format of your raw data, you may need to write an additional tidying script to pre-process. For example, the `ingest_redcap_data.py` converts the export format from redcap into a set of input csvs for CSVConvert.

## Setting up a cohort directory

For each dataset (cohort) that you want to convert, create a directory outside of this repository. For CanDIG devs, this will be in the private `data` repository. This cohort directory should contain:

* a `manifest.yml` file with settings for the mapping
* the template file lists custom mappings for each field
* (if needed) a python file that implements any cohort-specific mapping functions

**Important:** If you are placing this directory under version control and the cohort is not sample / synthetic data, do not place raw or processed data files in this directory, to avoid any possibility of committing protected data.

## Manifest file
The `manifest.yml` file contains settings for the cohort mapping. There is a sample file in `sample_inputs/manifest.yml` with documentation. The fields are:

```
description: A brief description
mapping: the csv file that lists the mappings for each field
identifier: submitter_donor_id
schema: a URL to the openapi schema file
functions:
- cohort-mapping-functions
```
## Mapping template

You'll need to create a mapping template that defines which mapping functions (if any) should be used for which fields.

The `generate_template.py` script will generate a template file based an openapi.yaml file. For using katsu with the current MoHCCN data model, the URL to the schema is https://raw.githubusercontent.com/CanDIG/katsu/develop/chord_metadata_service/mohpackets/docs/schema.yml (note raw github url).

--mapping or --manifest: Path to a manifest file describing the mapping
```
$ python generate_schema.py -h
usage: generate_schema.py [-h] --url URL [--out OUT]
The output mcode packets (`INPUT_map.json` and `INPUT_indexed.json`) will be in the parent of the `INPUT` directory.
options:
-h, --help show this help message and exit
--url URL URL to openAPI schema file (raw github link)
--out OUT name of output file; csv extension will be added. Default is template
```

## Generating mcode template file
For each line in the mapping template, specify any mapping required to convert your input data to the align with the schema. See the [mapping instructions](mapping_functions.md) for detailed documentation on filling out the template.

The `generate_template.py` script will generate a template file based on the version of katsu specified in `requirements.txt`.
**Note**: If your input data aligns perfectly with the schema (the column names are exact and unambiguous, and the field data matches the format specified by the schema), you do not need to modify the entry for that field.

**Note**: Do not edit, delete, or re-order the template lines, except to add mapping functions after the comma in each line.

## Testing
Continuous Integration is implemented through Pytest and Travis CI which runs when git pushes occur. Build results can be found at [this repository's Travis build page](https://travis-ci.com/github/CanDIG/medidata_mCode_ETL)

Continuous integration testing for this repository is implemented through Pytest and GitHub Actions which run when pushes occur. Build results can be found at [this repository's GitHub Actions page](https://github.com/CanDIG/clinical_ETL_code/actions/workflows/test.yml).

To run tests manually, enter from command line `$ pytest`

*Note: updated mCodePacket.json files must be pushed for all tests to pass during Travis builds*

<!-- # NOTE: the following sections have not been updated for current versions.
## Creating a dummy json file for testing
You can use an mocode template file (created as described above) alone to create a dummy ingest file without actual data.
You can use an mohcode template file (created as described above) alone to create a dummy ingest file without actual data.
`python create_test_mapping.py` creates a JSON that is filled in (without using mapping functions) with placeholder or dummy values. You can specify the placeholder value with the argument `--placeholder`. If no template file is specified with `--template`, the current MCODE_SCHEMA of katsu is used and the JSON is outputted to stdout. Otherwise, the file is saved to `<template>_testmap.json`.
Expand All @@ -50,4 +110,4 @@ $ python CSVConvert.py [-h] [--input INPUT] [--mapping|manifest MAPPING]
This tool outputs information quantifying:
* how much of the schema is covered by the mapping
* how much of the dataset is covered by the mapping
* how much of the dataset is covered by the mapping -->
123 changes: 60 additions & 63 deletions generate_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,20 @@
import mappings
import os
import pandas
import re
import sys
import yaml

import argparse
from chord_metadata_service.mcode.schemas import MCODE_SCHEMA
from schemas import candigv1_schema
from moh_mappings import mohschema
import re


def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument('--schema', type=str, help="Schema to use for template; default is mCodePacket")
parser.add_argument('--url', type=str, help="URL to openAPI schema file (raw github link)", required=True)
parser.add_argument('--out', type=str, help="name of output file; csv extension will be added. Default is template", default="template")
args = parser.parse_args()
return args


def generate_mapping_template(node, node_name="", node_names=None):
"""Create a template for mcodepacket, for use with the --template flag."""
if node_names is None:
Expand All @@ -33,78 +32,76 @@ def generate_mapping_template(node, node_name="", node_names=None):
x = node_names.pop()
x_match = re.match(r"(.+?)\**,.*", x)
if x_match is not None:
if x_match.group(1) in node_name:
node_names.append(f"##{x}")
else:
if x_match.group(1) not in node_name:
node_names.append(x)
elif x.endswith(".INDEX,"):
node_names.append(x)
else:
node_names.append(x)
if "description" in node:
node_names.append(f"{node_name},\"##{node['description']}\"")
else:
node_names.append(f"{node_name},")
if "type" in node:
if node["type"] == "string":
return "string", node_names
elif node["type"] == "array":
new_node_name = ".".join((node_name, "0"))
sc, nn = generate_mapping_template(node["items"], new_node_name, node_names)
return [sc], nn
elif node["type"] in ["number", "integer"]:
return 0, node_names
elif node["type"] == "boolean":
return True, node_names
elif node["type"] == "object":
scaffold = {}
if "$id" in node:
scaffold["$id"] = node["$id"]
if len(node_names) > 0:
# if this is an ontology_class_schema, we'll update this data post-mapping
if "$id" in node and (node["$id"] == "katsu:common:ontology_class"
or node["$id"] == "katsu:mcode:complex_ontology"):
# add a + to the name of the node to denote that this needs to be looked up in an ontology
name = node_names.pop()
name_match = re.match(r"(.+?),(.+)", name)
if name_match is not None:
name = f"{name_match.group(1)}+,{name_match.group(2)}"
node_names.append(name)
return node["$id"], node_names
if "properties" in node:
for prop in node["properties"]:
if node_name == "":
new_node_name = prop
else:
new_node_name = ".".join((node_name, prop))
if "required" in node and prop in node["required"]:
new_node_name += "*"
scaffold[prop], node_names = generate_mapping_template(node["properties"][prop], new_node_name, node_names)
return scaffold, node_names
if "str" in str(type(node)):
return "string", node_names
elif "list" in str(type(node)):
new_node_name = ".".join((node_name, "INDEX"))
sc, nn = generate_mapping_template(node[0], new_node_name, node_names)
return [sc], nn
elif "number" in str(type(node)) or "integer" in str(type(node)):
return 0, node_names
elif "boolean" in str(type(node)):
return True, node_names
elif "dict" in str(type(node)):
scaffold = {}
for prop in node.keys():
if node_name == "":
new_node_name = prop
else:
new_node_name = ".".join((node_name, prop))
scaffold[prop], node_names = generate_mapping_template(node[prop], new_node_name, node_names)
return scaffold, node_names
else:
return {}, node_names
return str(type(node)), node_names
return None, node_names

def main(args):
schema = args.schema
metadata = ""

url = args.url
schema = mohschema(url)
if schema is None:
schema = MCODE_SCHEMA
# get metadata about version of MCODE_SCHEMA used:
metadata += "## schema based on version " + version('katsu') + ",\n"
direct_url = [p for p in files('katsu') if 'direct_url.json' in str(p)]
if len(direct_url) > 0:
d = json.loads(direct_url[0].read_text())
metadata += f"## directly checked out from {d['url']}, commit {d['vcs_info']['commit_id']}\n"
if schema == "candigv1":
schema = candigv1_schema
sc, node_names = generate_mapping_template(schema)

with open(f"{template}.csv", 'w') as f: # write to csv file for mapping
print("Did not find an openapi schema at {}; please check link".format(url))
return
schema_array = schema.generate_schema_array()

outputfile = "{}.csv".format(args.out)
# print(f"Outputting schema template to {outputfile}")
# with open(outputfile,'w') as f:
# f.write("# Schema generated from {}\n".format(url))
# f.write("# mohschema.fieldname,mapping_function\n")
# f.writelines(schema_array)


metadata = ""

# if schema is None:
# schema = MCODE_SCHEMA
# # get metadata about version of MCODE_SCHEMA used:
# metadata += "## schema based on version " + version('katsu') + ",\n"
# direct_url = [p for p in files('katsu') if 'direct_url.json' in str(p)]
# if len(direct_url) > 0:
# d = json.loads(direct_url[0].read_text())
# metadata += f"## directly checked out from {d['url']}, commit {d['vcs_info']['commit_id']}\n"
# if schema == "candigv1":
# schema = candigv1_schema
sc, node_names = generate_mapping_template(schema_array["DonorWithClinicalData"])

with open(outputfile, 'w') as f: # write to csv file for mapping
f.write(metadata)
f.write("## mcodepacket element, description (overwrite with mapped element)\n")
f.write("## (.0 is an array element) (* is required) (+ denotes ontology term),\n")
f.write("## mohpacket element, description (overwrite with mapped element)\n")
# f.write("## (.INDEX is an array element) (* is required) (+ denotes ontology term),\n")
for nn in node_names:
f.write(f"{nn}\n")
print(f"Template written to {outputfile}")
return

if __name__ == '__main__':
Expand Down
102 changes: 102 additions & 0 deletions ingest_redcap_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
"""
Methods to transform the redcap raw data into the csv format expected by
CSVConcert.py
"""

import os
import argparse
import re
import pandas
import json
from pathlib import Path

def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument('--input', type=str, required = True, help="Raw csv output from Redcap")
parser.add_argument('--verbose', '--v', action="store_true", help="Print extra information")
parser.add_argument('--output', type=str, default="tmp_out", help="Optional name of output directory in same directory as input; default tmp_out")
args = parser.parse_args()
return args

def ingest_redcap_files(file):
"""Test of ingest of redcap output files"""
raw_csv_dfs = {}
file_match = re.match(r"(.+)\.csv$", file)
if file_match is not None:
print(f"Reading input file {file}")
try:
df = pandas.read_csv(file, dtype=str, encoding = "latin-1")
#print(f"initial df shape: {df.shape}")
# find and drop empty columns
df = drop_empty_columns(df)
# now we do some renaming, becuase for reasons we don't understand
# the program_id and submitter_donor_id columns are swapped
df.rename(columns={'program_id':'tempname'},inplace=True)
df.rename(columns={'submitter_donor_id':'program_id'},inplace=True)
df.rename(columns={'tempname':'submitter_donor_id'},inplace=True)
raw_csv_dfs[file] = df
except Exception as e:
raise Exception(f"File {file} does not seem to be a valid csv file")
else:
raise Exception(f"File {file} does not seem to be a csv file")
return raw_csv_dfs

def extract_repeat_instruments(df):
""" Transforms the single (very sparse) dataframe into one dataframe per
MoH schema. This makes it easier to look at, and also eliminates a bunch
of pandas warnings."""
new_dfs={}
starting_rows = df.shape[0]
repeat_instruments = df['redcap_repeat_instrument'].dropna().unique()
total_rows = 0
for i in repeat_instruments:
# each row has a redcap_repeat_instrument that describes the schema
# (e.g. Treatment) and a redcap_repeat_instance that is an id for that
# schema (this would be the treatment.id)
print(f"Extracting schema {i}")
schema_df = df.loc[df['redcap_repeat_instrument'] == i]
# drop all of the empty columns that aren't relevent for this schema
schema_df = drop_empty_columns(schema_df)
# rename the redcap_repeat_instance to the specific id (e.g. treatment_id)
schema_df.rename(columns={
'redcap_repeat_instance': f"{i}_id"
},
inplace=True
)
total_rows += schema_df.shape[0]
new_dfs[i]=schema_df

# now save all of the rows that aren't a repeat_instrument and
# label them Singleton for now
singletons = df.loc[df['redcap_repeat_instrument'].isnull()]
singletons = drop_empty_columns(singletons)
# check that we have all of the rows
if (total_rows + singletons.shape[0] < starting_rows):
print("Warning: not all rows recovered in raw data")
new_dfs['Singleton']=singletons
return new_dfs

def drop_empty_columns(df):
empty_cols = [col for col in df if df[col].isnull().all()]
df = df.drop(empty_cols, axis=1)
return df

def output_dfs(input_path,output_dir,df_list):
parent_path = Path(input_path).parent
tmpdir = Path(parent_path,output_dir)
if not tmpdir.is_dir():
tmpdir.mkdir()
print(f"Writing output files to {tmpdir}")
for d in df_list:
df_list[d].to_csv(Path(tmpdir,f"{d}.csv"), index=False)

def main(args):
input_path = args.input

raw_csv_dfs = ingest_redcap_files(input_path)
new_dfs = extract_repeat_instruments(raw_csv_dfs[input_path])
output_dir = args.output
output_dfs(input_path,output_dir,new_dfs)

if __name__ == '__main__':
main(parse_args())
Loading

0 comments on commit bbd19fc

Please sign in to comment.