Skip to content

Modify an existing pipeline

magosil86 edited this page Sep 11, 2015 · 8 revisions

Extending the PLINK QC pipeline to include basic population structure analysis

Background

The PLINK QC pipeline is comprised of three sub -pipelines:

  1. pipeline_qcplink_tasks1-5of20.py
  2. pipeline_qcplink_tasks6-14of20.py
  3. pipeline_qcplink_tasks15-20of20.py

This guide will focus on extending the pipeline_qcplink_tasks15-20of20.py sub -pipeline

Goal: Run PCA (principal component analysis) to determine whether the cases and controls are from similar populations?

Pipeline files needed to extend the PLINK QC pipeline

  1. pipeline_qcplink_tasks15-20of20.py
  2. pipeline_qcplink_tasks15-20of20_config.py
  3. pipeline_qcplink_tasks15-20of20_stages_config.py
  4. PlinkUserInput.py

Tasks to be added to the pipeline_qcplink_tasks15-20of20.py sub -pipeline

  1. Prunning the qced_plink.{fam, bim, bed} files to remain with a single SNP per LD (linkage disequilibrium) block
  2. Extract the .prune.in SNPs
  3. Run PCA
  4. Determine significant pcs with twstats

Understanding Ruffus pipeline tasks

Zoom in on the web browser to get a better view of the images pipeline_qcplink_tasks15-20_ruffus_excerpt8

Ruffus pipeline tasks can be viewed as having four main sections:

  1. Goal of the task is typically written in the form a -> b. This symbolizes the idea that file_a will be used to generate file_b

  2. Ruffus decorators chain together the tasks in the pipeline and outline the order in which tasks should be carried out.

  • The @transform decorator takes the Output(s) from a previous task and plugs them in as Input(s) for the next, thereby linking the two tasks together.
  • The @follows decorator specifies task dependency e.g. instead of executing task5 after task4, @follows can be invoked to ensure that task5 will run immediately after task3.
  1. The main function (a regular python function) parses the inputs and outputs received from the @transform decorator, assigning the .Success file to the flagFile variable and other input and/or output values to their respective variables. Finally the main function name and the input and output variables required by the main function are passed to runStageCheck as parameters.

  2. A call to runStageCheck prompts Rubra to generate a PBS shell script in order to send the job/task to the batch queue system (i.e. cluster).

See Ruffus manual

Keypoint: Ruffus pipelines are modularized hence each task will only have one main function.

Phase1- Editing pipeline_qcplink_tasks15-20of20.py (Ruffus pipeline)

pipeline_qcplink_tasks15-20_ruffus_excerpt1

pipeline_qcplink_tasks15-20_ruffus_excerpt2

<< Adding two new variables called: fully_qced_plink_bfiles and qced_plink_pruned >>

# path to plink binary files resulting from PLINK QC
fully_qced_plink_bfiles = (os.path.join(witsGWAS_SCRIPTS_ROOT_DIR, SC.CURRENT_PROJECT_DIR) + "qced_plink")

# path to plink binary files resulting from prunning PLINK QCed files
qced_plink_pruned_bfiles = (os.path.join(witsGWAS_SCRIPTS_ROOT_DIR, SC.CURRENT_PROJECT_DIR) + "qced_plink_pruned")

pipeline_qcplink_tasks15-20_ruffus_excerpt3

pipeline_qcplink_tasks15-20_ruffus_excerpt4

pipeline_qcplink_tasks15-20_ruffus_excerpt5

pipeline_qcplink_tasks15-20_ruffus_excerpt6

pipeline_qcplink_tasks15-20_ruffus_excerpt7

<< Adding the four new tasks (Prunning, extraction and PCA) to the pipeline >>

Appending task 1 of 4: Prunning

First step: Write out the goal of the task

#Task20.2: qced_plink.{fam, bim, bed} -> qced_plink_pruned.prune.in                       # Goal of the task

Second step: Chain/link the new task to the previous task

Note: We are happy with this task occurring immediately after task20.1 hence no need for @follows

@transform(remove_xchr_SNPs,                                                              # Input prefix      = qcplink                                                              
           suffix('.RemoveXchrSNPs.Success'),                                             # Input suffix      = .RemoveXchrSNPs.Success                                              
           '.QcedPlinkPruneIn.Success')                                                   # Output suffix     = .QcedPlinkPruneIn.Success                                            

Third step: Write out the main function that will do the actual work of the task

def prune_qced_plink_for_pca(inputs, outputs):
    """                                                                                                                                                                              
    Minimize computational complexity prior to carrying out PCA by pruning the SNPs; such that                                                                                       
    no pair of SNPs within a 50-SNP sliding window (advanced by 5 SNPs each time) has an                                                                                             
    r_squared value greater than 0.2                                                                                                                                                 
    """

    flagFile = outputs
    print ("pruning the SNPs such that no pair of SNPs within a 50-SNP sliding window. \n"
    "(advanced by 5 SNPs each time) has an r_squared value greater than 0.2")
    runStageCheck('prune_qced_plink_for_pca', flagFile, fully_qced_plink_bfiles)
    print "Task 20.2 completed successfully \n"

End product from combining steps 1, 2 and 3:

#Task20.2: qced_plink.{fam, bim, bed} -> qced_plink_pruned.prune.in                       # Goal of the task                                                                         

@transform(remove_xchr_SNPs,                                                              # Input prefix      = qcplink                                                              
           suffix('.RemoveXchrSNPs.Success'),                                             # Input suffix      = .RemoveXchrSNPs.Success                                              
           '.QcedPlinkPruneIn.Success')                                                   # Output suffix     = .QcedPlinkPruneIn.Success                                            

def prune_qced_plink_for_pca(inputs, outputs):
    """                                                                                                                                                                              
    Minimize computational complexity prior to carrying out PCA by pruning the SNPs; such that                                                                                       
    no pair of SNPs within a 50-SNP sliding window (advanced by 5 SNPs each time) has an                                                                                             
    r_squared value greater than 0.2                                                                                                                                                 
    """

    flagFile = outputs
    print ("pruning the SNPs such that no pair of SNPs within a 50-SNP sliding window. \n"
    "(advanced by 5 SNPs each time) has an r_squared value greater than 0.2")
    runStageCheck('prune_qced_plink_for_pca', flagFile, fully_qced_plink_bfiles)
    print "Task 20.2 completed successfully \n"

Key points:

  1. Each task is referred to by its main function name hence the task created above will be called prune_qced_plink_for_pca
  2. The command that the main function carries out will be specified in the file: pipeline_qcplink_tasks15-20of20_stages_config.py
  3. By invoking runStageCheck, the command corresponding to the prune_qced_plink_for_pca task in pipeline_qcplink_tasks15-20of20_stages_config.py will receive the variables: flagFile and fully_qced_plink_bfiles as its inputs.
  4. Global variables e.g. fully_qced_plink_bfiles need NOT be passed through the main function as parameters for them to be used in runStageCheck

Appending task 2 of 4: Extract the .prune.in SNPs

#Task20.3: qced_plink_pruned.prune.in -> qced_plink_pruned.{fam, bim, bed}                # Goal of the task                                                                         

@transform(prune_qced_plink_for_pca,                                                      # Input prefix      = qcplink                                                              
           suffix('.QcedPlinkPruneIn.Success'),                                           # Input suffix      = .QcedPlinkPruneIn.Success                                            
           add_inputs("qced_plink_pruned.prune.in"),                                      # add_inputs        = qced_plink_pruned.prune.in                                           
           '.QcedPlinkPruned.Success')                                                    # Output suffix     = .QcedPlinkPruned.Success                                             

def extract_prunein_qcedplink_for_pca(inputs, outputs):
    """                                                                                                                                                                              
    Generate a subset of SNPs that are in approximate LD (Linkage disequilibrium) with each other                                                                                    
    in preparation for running PCA                                                                                                                                                   
    """

    prune_in_snps = inputs[1]                                                             # assigning qced_plink_pruned.prune.in to prune_in_snps                                    

    flagFile = outputs
    print ("Generating a subset of SNPs that are in approximate LD (Linkage disequilibrium) \n"
    " with each other in preparation for running PCA")
    runStageCheck('extract_prunein_qcedplink_for_pca', flagFile, fully_qced_plink_bfiles, prune_in_snps)
    print "Task 20.3 completed successfully \n"

Note: Introducing additional inputs means that the inputs parameter in the main function extract_prunein_qcedplink_for_pca is now a list: i.e. [qcplink.QcedPlinkPruneIn.Success, qced_plink_pruned.prune.in]

Appending task 3 of 4: Run PCA on qced PLINK binaries

#Task20.4: qced_plink_pruned.{fam, bim, bed} -> qced_plink_pruned.pca.evec                # Goal of the task                                                                         

@transform(extract_prunein_qcedplink_for_pca,                                             # Input prefix      = qcplink                                                              
           suffix('.QcedPlinkPruned.Success'),                                            # Input suffix      = .QcedPlinkPruned.Success                                             
           '.QcedPlinkPrunedPCA.Success')                                                 # Output suffix     = .QcedPlinkPrunedPCA.Success                                          

def do_pca_on_pruned_qcedplink(inputs, outputs):

    """                                                                                                                                                                              
    Run PCA on qced_plink_pruned.{fam, bim, bed} to determine whether the cases and                                                                                                  
    controls are from similar populations.                                                                                                                                           
    """

    flagFile = outputs
    print ("Run PCA on qced_plink_pruned.{fam, bim, bed} to determine whether the cases and \n"
    " controls are from similar populations")
    runStageCheck('do_pca_on_pruned_qcedplink', flagFile)
    print "Task 20.4 completed successfully \n"

Appending task 4 of 4: Determine significant pcs with twstats

#Task20.5: qced_plink_pruned.eval -> qced_plink_pruned.tw                                 # Goal of the task

@transform(do_pca_on_pruned_qcedplink,                                                    # Input prefix      = qcplink                                                              
           suffix('.QcedPlinkPrunedPCA.Success'),                                         # Input suffix      = .QcedPlinkPrunedPCA.Success
           add_inputs("qced_plink_pruned.eval"),                                          # add inputs        =  qced_plink_pruned.pca.eval                                          
           '.QcedPlinkPCATwstats.Success')                                                # Output suffix     = .QcedPlinkPCATwstats.Success
           
def do_twstats_on_pca_results(inputs, outputs):

    """                                                                                                                                                                              
    Which eigenvectors are significant?.
    Used twstats to assess if there is significant structure in each PC.                                                                                                  
    """
    
    qcedplink_pca_eval = inputs[1]

    flagFile = outputs
    print "Using twstats to assess if there is significant structure in each PC"
    runStageCheck('do_twstats_on_pca_results', flagFile, qcedplink_pca_eval)
    print "Task 20.5 completed successfully \n"

Saving

Save the extended pipeline_qcplink_tasks15-20of20.py file as pipeline_qcplink_tasks15-20of20_extended.py

Phase2- Editing pipeline_qcplink_tasks15-20of20_config.py (Pipeline config)

pipeline_qcplink_tasks15-20_config_excerpt1

Note: The values supplied to the keys in the working_files and preselected_cutoff dictionaries are supplied by the user via the PlinkUserInput.py file. Our extensions do not require any user input therefore, we shall not be adding any new key-value pairs to these (working_files and preselected_cutoff) dictionaries.

pipeline_qcplink_tasks15-20_config_excerpt2

Saving

Save the extended pipeline_qcplink_tasks15-20of20_config.py file as pipeline_qcplink_tasks15-20of20_config_extended.py

Phase3- Editing pipeline_qcplink_tasks15-20of20_stages_config.py (Pipeline stages config)

pipeline_qcplink_tasks15-20_config_excerpt3

pipeline_qcplink_tasks15-20_config_excerpt4

<< Adding the commands for the four new tasks to pipeline_qcplink_tasks15-20of20_stages_config.py >>

pipeline_qcplink_tasks15-20_config_excerpt5

Command for the: prune_qced_plink_for_pca task

'prune_qced_plink_for_pca': {
                        'command': plink + " --bfile %fully_qced_plink_bfiles --indep-pairwise 50 5 0.2 --out qced_plink_pruned"
                },

Command for the: extract_prunein_qcedplink_for_pca task

'extract_prunein_qcedplink_for_pca': {
                        'command': plink + " --bfile %fully_qced_plink_bfiles --extract %prune_in_snps --make-bed --out qced_plink_pruned"
                },

Command for the: do_pca_on_pruned_qcedplink task

'do_pca_on_pruned_qcedplink': {
                        'command': "bash ../../runpca.sh qced_plink_pruned"
                },

Command for the: do_twstats_on_pca_results task

'do_twstats_on_pca_results': {
                        'command': "twstats -t ../../twtable -i %qcedplink_pca_eval -o qcedplink_pca.tw | echo 'twstats run on pca results of qced plink bfiles'"
                },

End product after adding all four commands

if I.sexinfo_available:
        stages = {
                'find_SNPs_extreme_diff_miss': {
                        "command": perl + " ../../select_diffmiss_qcplink.pl %cut_diff_miss"
                },
                'find_SNPs_extreme_HWE_deviations': {
                        'command': plink + " --bfile %sample_qced_plink_bfiles --hardy --out clean_inds_qcplink_hwe"
                },
                'find_unaffected_for_HWE_plot': {
                        'command': "head -1 clean_inds_qcplink_hwe.hwe > clean_inds_qcplink_hweu.hwe | grep 'UNAFF' clean_inds_qcplink_hwe.hwe >> clean_inds_qcplink_hweu.hwe"
                },
                'generate_hwe_plot': {
                        'command': "Rscript ../../hwe_plot_qcplink.R"
                },
                'remove_SNPs_failing_Qc': {
                        'command': plink + " --bfile %sample_qced_plink_bfiles --maf %cut_maf --geno %cut_geno --exclude %fail_diffmiss --hwe %cut_hwe --make-bed --out clean_qcplink"
                },
                'find_xchr_SNPs': {
                        'command': plink + " --bfile %SNP_qced_plink_bfiles --chr 23 --make-bed --out xsnps"
                },
                'remove_xchr_SNPs': {
                        'command': plink + " --bfile %SNP_qced_plink_bfiles --exclude %x_SNPs_bim --make-bed --out qced_plink"
                },
                'prune_qced_plink_for_pca': {
                        'command': plink + " --bfile %fully_qced_plink_bfiles --indep-pairwise 50 5 0.2 --out qced_plink_pruned"
                },
                'extract_prunein_qcedplink_for_pca': {
                        'command': plink + " --bfile %fully_qced_plink_bfiles --extract %prune_in_snps --make-bed --out qced_plink_pruned"
                },
                'do_pca_on_pruned_qcedplink': {
                        'command': "bash ../../runpca.sh qced_plink_pruned"
                },
                'do_twstats_on_pca_results': {
                        'command': "twstats -t ../../twtable -i %qcedplink_pca_eval -o qcedplink_pca.tw | echo 'twstats run on pca results of qced plink bfiles'"
                },
            }
	
else:
        stages = {
                'find_SNPs_extreme_diff_miss': {
                        "command": perl + " ../../select_diffmiss_qcplink.pl %cut_diff_miss"
                },
                'find_SNPs_extreme_HWE_deviations': {
                        'command': plink1 + " --noweb --bfile %sample_qced_plink_bfiles --allow-no-sex --hardy --out clean_inds_qcplink_hwe"
                },
                'find_unaffected_for_HWE_plot': {
                        'command': "head -1 clean_inds_qcplink_hwe.hwe > clean_inds_qcplink_hweu.hwe | grep 'UNAFF' clean_inds_qcplink_hwe.hwe >> clean_inds_qcplink_hweu.hwe"
                },
                'generate_hwe_plot': {
                        'command': "Rscript ../../hwe_plot_qcplink.R"
                },
                'remove_SNPs_failing_Qc': {
                        'command': plink1 + " --noweb --bfile %sample_qced_plink_bfiles --allow-no-sex --maf %cut_maf --geno %cut_geno --exclude %fail_diffmiss --hwe %cut_hwe --make-bed --out clean_qcplink"
                },
                'find_xchr_SNPs': {
                        'command': "touch xsnps.bim | echo no sex info available hence no x chromosome to find in %SNP_qced_plink_bfiles"
                },
                'remove_xchr_SNPs': {
                        'command': plink + " --bfile %SNP_qced_plink_bfiles --exclude %x_SNPs_bim --make-bed --out qced_plink"
                },
                'prune_qced_plink_for_pca': {
                        'command': plink + " --bfile %fully_qced_plink_bfiles --allow-no-sex --indep-pairwise 50 5 0.2 --out qced_plink_pruned"
                },
                'extract_prunein_qcedplink_for_pca': {
                        'command': plink + " --bfile %fully_qced_plink_bfiles --allow-no-sex --extract %prune_in_snps --make-bed --out qced_plink_pruned"
                },
                'do_pca_on_pruned_qcedplink': {
                        'command': "bash ../../runpca.sh qced_plink_pruned"
                },
                'do_twstats_on_pca_results': {
                        'command': "twstats -t ../../twtable -i %qcedplink_pca_eval -o qcedplink_pca.tw | echo 'twstats run on pca results of qced plink bfiles'"
                },
            }

Saving

Save the extended pipeline_qcplink_tasks15-20of20_stages_config.py file as pipeline_qcplink_tasks15-20of20_stages_config_extended.py

All done! we can now run the pipeline

cd witsGWAS/
rubra pipeline_qcplink_tasks15-20of20_extended.py --config pipeline_qcplink_tasks15-20of20_config_extended.py pipeline_qcplink_tasks15-20of20_stages_config_extended.py PlinkUserInput.py --style run
Clone this wiki locally