Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trying to get Spark pipeline working with slightly out of date code. #1313

Closed
TomNash opened this issue Dec 12, 2016 · 20 comments
Closed

Trying to get Spark pipeline working with slightly out of date code. #1313

TomNash opened this issue Dec 12, 2016 · 20 comments
Labels
Milestone

Comments

@TomNash
Copy link

TomNash commented Dec 12, 2016

I'm going off the code posted here:
https://github.com/allenday/spark-genome-alignment-demo/blob/master/bin/bowtie_pipe_single.scala

I've made some changes successfully to accommodate changes in the ADAM code, but I'm getting hung up on trying to use the SAMRecordConverter. Currently the code is using the line below but this fails, saying the type is not found.

val samRecordConverter = new SAMRecordConverter

I've got no Scala experience, but I from what I understand the class settings of private[adam] SAMRecordConverter are pretty restrictive. Is there a way I can access and use it?

@TomNash TomNash changed the title Trying to get Spark pipeline working, slightly out of date code. Trying to get Spark pipeline working with slightly out of date code. Dec 12, 2016
@heuermh
Copy link
Member

heuermh commented Dec 12, 2016

We have a new Pipe API (#1114) to support this use case, then you wouldn't need any of the SAM file parsing or conversion stuff at all, rather something like

val reads = ac.loadAlignments(sys.env("DEMO")+"/build/data/reads.fq")

implicit val tFormatter = SAMInFormatter
implicit val uFormatter = new AnySAMOutFormatter

val pipedRdd: AlignmentRecordRDD = reads.pipe("bowtie ...")

Bowtie/bowtie2 is something worth having a proper Pipe API wrapper for, let me take a closer look. I don't remember if it can accept SAM as piped input, for example.

@TomNash
Copy link
Author

TomNash commented Jan 13, 2017

Sorry to bother again, just wanted to ask on a more concrete scale this time and make sure I understand as this is all new to me.

So the example starts with taking some FASTQ file and the goal is to align it with bowtie and write to SAM format. The way this code was doing it was using some perhaps overly complicated text parsing/formatting within Spark using some methods provided, then saving the file to SAM format.

What you're suggesting is to read in the FASTQ file (what is the reason for using loadAlignments as opposed to loadFastq here?), use the Pipes API to run the bowtie alignment (input here would be the appropriate bowtie command as if it were run over CLI?), then save that output to SAM format (using .saveAsSam(path, ...) I assume?)

So something like this:

val reads = ac.loadAlignments(sys.env("DEMO")+"/build/data/reads.fq")

implicit val tFormatter = SAMInFormatter
implicit val uFormatter = new AnySAMOutFormatter

val pipedRdd: AlignmentRecordRDD = reads.pipe("bowtie <align fastq to genome>")

pipedRdd.saveAsSam(<path to file>, asSingleFile = false, isSorted = false)

This would eliminate a lot of the original code, especially all of the mapping and filtering.

@heuermh
Copy link
Member

heuermh commented Jan 13, 2017

Yes, that looks about right. Any of the FASTQ-related load methods would work; they all return AlignmentRecordRDD.

@TomNash
Copy link
Author

TomNash commented Jan 23, 2017

First, thank you for the response. I'm able to run the code using the skeleton above.

Can you comment on the Pipe API or point me in the direction of some documentation? If we were to run reads.pipe("bowtie -S /path/to/genome /path/to/input.fastq"), does it run in any sort of distributed manner?

@fnothaft
Copy link
Member

Hi @TomNash! Glad to hear that you were able to get the skeleton working! The pipe API has some inline docs, but they are limited. I've opened #1368 to address this more fully.

In your example, that's pretty close to how it'd work! The pipe API will run that command in a distributed fashion, using ADAM/Spark to parallelize the work across all of the machines and to format the data that is going into the pipe. There are a few small differences from your example:

  • Instead of giving the path to an input FASTQ, you'd load the reads through sc.loadFragment (for paired FASTQ), and these would be streamed on standard in to the running command.
  • Is your reference file on a shared file system like an NFS? If not, you can pass this path to the pipe command via an optional files argument (see example below) and Spark will copy it locally to the node running the task. (Note: I have not extensively tested this)
  • You'll need to give some guidance to the pipe API on how the data should be formatted using an InFormatter and an OutFormatter. Example is below.

You can see an example of a similar command in our unit tests. In your case, I think your command would look like:

import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.fragment.InterleavedFASTQInFormatter
import org.bdgenomics.adam.rdd.read.{ AlignmentRecordRDD, AnySAMOutFormatter }

implicit val tFormatter = InterleavedFASTQInFormatter
implicit val uFormatter = new AnySAMOutFormatter

val pipedRdd: AlignmentRecordRDD = ardd.pipe("bowtie -S $0",
  files = Seq("/path/to/genome"))

This would assume that bowtie is installed (and on the PATH) on every node in the cluster.

Let me know if this helps, or if you've got any questions!

@heuermh
Copy link
Member

heuermh commented Jan 24, 2017

As mentioned elsewhere, we would like to build up a repository of commonly used bioinformatics tools wrapped in the Pipe API, a proposal for which is here

https://github.com/heuermh/cannoli

The example above is not quite right. First, the scala compiler isn't able to deduce types from the implicits, so the pipe invocation needs some help. Second, the bowtie command requires - as an argument to read from stdin. Finally, including the indices as separate files won't work, because the bowtie <ebwt> argument specifies the base name instead of individual index files.

This seems to work for me
https://github.com/heuermh/cannoli/blob/master/src/main/scala/org/bdgenomics/cannoli/Bowtie.scala#L73

$ brew install homebrew/science/bowtie
$ bowtie-build ref.fa ref

$ brew install homebrew/science/adam
$ git clone https://github.com/heuermh/cannoli
$ cd cannoli
$ ./scripts/move_to_scala_2.11.sh
$ ./scripts/move_to_spark_2.sh
$ mvn install

$ ADAM_MAIN=org.bdgenomics.cannoli.Cannoli \
  adam-submit \
  --jars target/cannoli-spark2_2.11-0.1-SNAPSHOT.jar \
  -- \
  bowtie -single -bowtie_index ref reads.ifq bowtie.sam

Using ADAM_MAIN=org.bdgenomics.cannoli.Cannoli
Using SPARK_SUBMIT=/usr/local/bin/spark-submit
# reads processed: 6
# reads with at least one reported alignment: 0 (0.00%)
# reads that failed to align: 6 (100.00%)
No alignments

$ head bowtie.sam 
@HD	VN:1.5	SO:unsorted
H06HDADXX130110:2:2116:3345:91806	4	*	0	0	* ...

@fnothaft
Copy link
Member

Nice! Thats awesome, @heuermh! I will take more of a gander over the code later this week.

@TomNash
Copy link
Author

TomNash commented Jan 30, 2017

Tried using the format above from @heuermh , getting the following output:

Using ADAM_MAIN=org.bdgenomics.cannoli.Cannoli
Using SPARK_SUBMIT=/home/anderson-lab/software/spark/bin/spark-submit
Exception in thread "main" java.lang.IncompatibleClassChangeError: Implementing class
        at java.lang.ClassLoader.defineClass1(Native Method)
        at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
        at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
        at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
        at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at org.bdgenomics.cannoli.Bowtie.<init>(Bowtie.scala:68)
        at org.bdgenomics.cannoli.Bowtie$.apply(Bowtie.scala:38)
        at org.bdgenomics.cannoli.Bowtie$.apply(Bowtie.scala:33)
        at org.bdgenomics.adam.cli.ADAMMain.apply(ADAMMain.scala:126)
        at org.bdgenomics.cannoli.Cannoli$.main(Cannoli.scala:29)
        at org.bdgenomics.cannoli.Cannoli.main(Cannoli.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:738)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Some Googling has lead me to believe this has to do with Maven dependency version issues, but not sure where that problem would arise here.

@fnothaft
Copy link
Member

Hi @TomNash! What versions of Java and Spark are you running?

@TomNash
Copy link
Author

TomNash commented Jan 30, 2017

Java:

java version "1.8.0_121"
Java(TM) SE Runtime Environment (build 1.8.0_121-b13)
Java HotSpot(TM) 64-Bit Server VM (build 25.121-b13, mixed mode)

Scala:

Scala code runner version 2.11.4 -- Copyright 2002-2013, LAMP/EPFL

@heuermh
Copy link
Member

heuermh commented Jan 30, 2017

@TomNash that is what I see when running a build for Spark 1.x and Scala 2.10 on Spark 2.x with Scala 2.11. Did you use the move_to_spark_1.sh and move_to_scala_2.11.sh scripts when building cannoli as described above?

@TomNash
Copy link
Author

TomNash commented Jan 30, 2017

Ran both scripts. Here is full output from install. I've tried a mvn clean install as well, same result.

anderson-lab@genomics-vm:~/software$ rm -rf cannoli/
anderson-lab@genomics-vm:~/software$ git clone https://github.com/heuermh/cannoli                                                                                                                        
Cloning into 'cannoli'...
remote: Counting objects: 101, done.
remote: Compressing objects: 100% (34/34), done.
remote: Total 101 (delta 30), reused 97 (delta 26), pack-reused 0
Receiving objects: 100% (101/101), 25.64 KiB | 0 bytes/s, done.
Resolving deltas: 100% (30/30), done.
Checking connectivity... done.
anderson-lab@genomics-vm:~/software$ cd cannoli
anderson-lab@genomics-vm:~/software/cannoli$ ./scripts/move_to_scala_2.11.sh 
anderson-lab@genomics-vm:~/software/cannoli$ ./scripts/move_to_spark_2.sh 
anderson-lab@genomics-vm:~/software/cannoli$ mvn install -q                                                                                                                                              
Discovery starting.
Discovery completed in 88 milliseconds.
Run starting. Expected test count is: 0
DiscoverySuite:
Run completed in 166 milliseconds.
Total number of tests run: 0
Suites: completed 1, aborted 0
Tests: succeeded 0, failed 0, canceled 0, ignored 0, pending 0
No tests were executed.
model contains 20 documentable templates
anderson-lab@genomics-vm:~/software/cannoli$ ADAM_MAIN=org.bdgenomics.cannoli.Cannoli \
adam-submit --jars target/cannoli-spark2_2.11-0.1-SNAPSHOT.jar -- bowtie -single \
-bowtie_index /home/anderson-lab/spark-genome-alignment-demo/build/data/NC_008253 \
/home/anderson-lab/spark-genome-alignment-demo/build/data/reads.fq bowtie.sam
Using ADAM_MAIN=org.bdgenomics.cannoli.Cannoli
Using SPARK_SUBMIT=/home/anderson-lab/software/spark/bin/spark-submit
Exception in thread "main" java.lang.IncompatibleClassChangeError: Implementing class
        at java.lang.ClassLoader.defineClass1(Native Method)
        at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
        at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
        at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
        at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at org.bdgenomics.cannoli.Bowtie.<init>(Bowtie.scala:68)
        at org.bdgenomics.cannoli.Bowtie$.apply(Bowtie.scala:38)
        at org.bdgenomics.cannoli.Bowtie$.apply(Bowtie.scala:33)
        at org.bdgenomics.adam.cli.ADAMMain.apply(ADAMMain.scala:126)
        at org.bdgenomics.cannoli.Cannoli$.main(Cannoli.scala:29)
        at org.bdgenomics.cannoli.Cannoli.main(Cannoli.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:738)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
anderson-lab@genomics-vm:~/software/cannoli$

@heuermh
Copy link
Member

heuermh commented Jan 30, 2017

Yeah ... --jars target/cannoli-spark2_2.11-0.1-SNAPSHOT.jar looks right.
What does spark-submit --version read?

@TomNash
Copy link
Author

TomNash commented Jan 30, 2017

anderson-lab@genomics-vm:~/software/cannoli$ $SPARK_HOME/bin/spark-submit --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/
                        
Using Scala version 2.11.8, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_121
Branch 
Compiled by user jenkins on 2016-12-16T02:04:48Z
Revision 
Url 
Type --help for more information.

@heuermh
Copy link
Member

heuermh commented Jan 30, 2017

Hmm, I'm sorry, I don't see what problem could be.

I just tried on a new Mac

cannoli master!
$ history
  149  git clone https://github.com/heuermh/cannoli
  150  cd cannoli/
  151  ./scripts/move_to_scala_2.11.sh 
  152  ./scripts/move_to_spark_2.sh 
  153  mvn install
  154  cp ../adam/adam-core/src/test/resources/chr20.250k.fa.gz .
  155  gunzip chr20.250k.fa.gz 
  156  bowtie-build chr20.250k.fa chr20.250k

cannoli master!
$ brew install adam
==> Installing adam from homebrew/science
==> Downloading https://homebrew.bintray.com/bottles-science/adam-0.21.0.el_capi
######################################################################## 100.0%
==> Pouring adam-0.21.0.el_capitan.bottle.tar.gz
🍺  /usr/local/Cellar/adam/0.21.0: 363 files, 51.3M

cannoli master!
$ adam-submit --version
Using ADAM_MAIN=org.bdgenomics.adam.cli.ADAMMain
Using SPARK_SUBMIT=/usr/local/bin/spark-submit

       e         888~-_          e             e    e
      d8b        888   \        d8b           d8b  d8b
     /Y88b       888    |      /Y88b         d888bdY88b
    /  Y88b      888    |     /  Y88b       / Y88Y Y888b
   /____Y88b     888   /     /____Y88b     /   YY   Y888b
  /      Y88b    888_-~     /      Y88b   /          Y888b

ADAM version: 0.21.0
Built for: Scala 2.11.8 and Hadoop 2.7.3

cannoli master!
$ spark-submit --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/
                        
Using Scala version 2.11.8, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_111
Branch 
Compiled by user jenkins on 2016-12-16T02:04:48Z
Revision 
Url 
Type --help for more information.

cannoli master!
$ ADAM_MAIN=org.bdgenomics.cannoli.Cannoli \
> adam-submit \
> --jars target/cannoli-spark2_2.11-0.1-SNAPSHOT.jar \
> -- \
> bowtie -single -bowtie_index chr20.250k ../adam/adam-core/src/test/resources/interleaved_fastq_sample1.ifq bowtie.sam
Using ADAM_MAIN=org.bdgenomics.cannoli.Cannoli
Using SPARK_SUBMIT=/usr/local/bin/spark-submit
2017-01-30 12:45:31 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
# reads processed: 6
# reads with at least one reported alignment: 0 (0.00%)
# reads that failed to align: 6 (100.00%)
No alignments

@TomNash
Copy link
Author

TomNash commented Feb 3, 2017

So the above example works after installing the binary ADAM release.

Does cannoli work only with interleaved FASTQ?

@chowbina
Copy link

chowbina commented Apr 5, 2017

Hello,

Thanks for the great tool. I get the following error while running bowtie using ADAM 0.22 and Spark 2.1

$ ADAM_MAIN=org.bdgenomics.cannoli.Cannoli adam-submit --jars target/cannoli-spark2_2.11-0.1-SNAPSHOT.jar -- bowtie -single -bowtie_index chr20.250k interleaved_fastq_sample1.ifq bowtie.sam

Using ADAM_MAIN=org.bdgenomics.cannoli.Cannoli Using SPARK_SUBMIT=/Users/sudhir-chowbina/Documents/Softwares/spark//bin/spark-submit 2017-04-05 11:59:39 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Error: reads file does not look like a FASTQ file Command: bowtie-align --wrapper basic-0 -S chr20.250k - 2017-04-05 11:59:48 WARN BlockManager:66 - Putting block rdd_2_0 failed due to an exception 2017-04-05 11:59:48 WARN BlockManager:66 - Block rdd_2_0 could not be removed as it was not found on disk or in memory 2017-04-05 11:59:48 ERROR Executor:91 - Exception in task 0.0 in stage 0.0 (TID 0) java.lang.RuntimeException: Piped command List(bowtie, -S, chr20.250k, -) exited with error code 1. at org.bdgenomics.adam.rdd.OutFormatterRunner.hasNext(OutFormatter.scala:37) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:215) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:957) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:948) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:888) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:694) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) at org.apache.spark.rdd.RDD.iterator(RDD.scala:285) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 2017-04-05 11:59:48 WARN TaskSetManager:66 - Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.RuntimeException: Piped command List(bowtie, -S, chr20.250k, -) exited with error code 1. at org.bdgenomics.adam.rdd.OutFormatterR

@fnothaft
Copy link
Member

fnothaft commented Apr 5, 2017

Ah, interesting! Thanks for posting the error @chowbina. @heuermh have you seen that before?

@heuermh
Copy link
Member

heuermh commented Apr 5, 2017

I've created a new issue in the cannoli repository https://github.com/heuermh/cannoli/issues/18 to track.

@fnothaft
Copy link
Member

Closing as this has moved downstream to bigdatagenomics/cannoli#18.

@heuermh heuermh modified the milestone: 0.23.0 Jul 22, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants