Skip to content

Commit

Permalink
Samtools (#47)
Browse files Browse the repository at this point in the history
* add samtools (without documentation) to minimap2 branch.
  • Loading branch information
Koeng101 authored Jan 1, 2024
1 parent bacf1de commit 01b183e
Show file tree
Hide file tree
Showing 18 changed files with 1,361 additions and 70 deletions.
12 changes: 12 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,18 @@ jobs:
mkdir -p $HOME/bin
cp ./minimap2-2.26_x64-linux/minimap2 $HOME/bin
echo "$HOME/bin" >> $GITHUB_PATH
- name: Install dependencies for samtools
run: |
sudo apt-get update
sudo apt-get install -y libncurses5-dev libbz2-dev liblzma-dev zlib1g-dev
- name: Download and install samtools
run: |
curl -L https://github.com/samtools/samtools/releases/download/1.13/samtools-1.13.tar.bz2 | tar -jxvf -
cd samtools-1.13
./configure --prefix=$HOME/samtools
make
make install
echo "$HOME/samtools/bin" >> $GITHUB_PATH
- name: Test external
run: go test -v ./external/...
openbsd-test:
Expand Down
33 changes: 2 additions & 31 deletions external/minimap2/minimap2.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,9 @@ For more information on minimap2, please visit Heng Li's git: https://github.com
package minimap2

import (
"context"
"io"
"os"
"os/exec"

"golang.org/x/sync/errgroup"
)

// Minimap2 aligns sequences using minimap2 over the command line. Right
Expand Down Expand Up @@ -69,37 +66,11 @@ func Minimap2(templateFastaInput io.Reader, fastqInput io.Reader, w io.Writer) e

// Start minimap2 pointing to the temporary file and stdin for sequencing data
cmd := exec.Command("minimap2", "-ax", "map-ont", tmpFile.Name(), "-")
stdout, err := cmd.StdoutPipe()
if err != nil {
return err
}
stdin, err := cmd.StdinPipe()
if err != nil {
return err
}
cmd.Stdout = w
cmd.Stdin = fastqInput
if err := cmd.Start(); err != nil {
return err
}

// Create an errgroup group to manage goroutines
g, _ := errgroup.WithContext(context.Background())

// Write data to the stdin of minimap2 (sequencing data) using a goroutine
g.Go(func() error {
defer stdin.Close()
_, err := io.Copy(stdin, fastqInput)
return err
})

// Start copying the output of minimap2 to w using a goroutine
g.Go(func() error {
_, err := io.Copy(w, stdout)
return err
})

// Wait for all goroutines to complete
if err := g.Wait(); err != nil {
return err
}
return cmd.Wait()
}
2 changes: 1 addition & 1 deletion external/minimap2/minimap2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestMinimap2(t *testing.T) {
// Prepare the writer to capture the output
var buf bytes.Buffer

// Execute the Minimap2Raw function
// Execute the Minimap2 function
err = minimap2.Minimap2(templateFile, fastqFile, &buf)
if err != nil {
t.Errorf("Minimap2Raw returned an error: %v", err)
Expand Down
63 changes: 63 additions & 0 deletions external/samtools/data/aln.sam

Large diffs are not rendered by default.

236 changes: 236 additions & 0 deletions external/samtools/data/reads.fastq

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions external/samtools/data/template.fasta
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
>Cn_Pp_Killer-alpha-MF-Delta
AATGACCAAACCAACGCAAGTCTTAGTTCGTTCAGTCTCTATTTTATTCTTCATCACACTGTTGCACTTGGTTGTTGCAATGAGATTTCCTAGTATTTTCACTGCTGTGCTATTTGCCGCTAGTTCCGCTCTAGCTGCTCCAGTTAATACTACTACTGAAGATGAATTGGAGGGTGACTTCGATGTTGCTGTTCTGCCTTTTTCCGCTTCTATCGCAGCCAAGGAAGAAGGTGTATCTCTAGAGAAGCGTC
106 changes: 106 additions & 0 deletions external/samtools/samtools.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
Package samtools wraps the samtools cli to be used with Go.
*/
package samtools

import (
"context"
"io"
"os"
"os/exec"
"syscall"

"golang.org/x/sync/errgroup"
)

// Pileup runs a
func Pileup(templateFastas io.Reader, samAlignments io.Reader, w io.Writer) error {
// Create a temporary file for the template fasta
tmpFile, err := os.CreateTemp("", "template_*.fasta")
if err != nil {
return err
}
defer os.Remove(tmpFile.Name()) // Clean up file afterwards

// Write template fasta data to the temporary file
if _, err := io.Copy(tmpFile, templateFastas); err != nil {
return err
}
tmpFile.Close() // Close the file as it's no longer needed

g, ctx := errgroup.WithContext(context.Background())

// Setup pipe connections between commands
viewSortReader, viewSortWriter := io.Pipe()
sortMpileupReader, sortMpileupWriter := io.Pipe()

// Define commands with context
viewCmd := exec.CommandContext(ctx, "samtools", "view", "-bF", "4")
sortCmd := exec.CommandContext(ctx, "samtools", "sort", "-")
mpileupCmd := exec.CommandContext(ctx, "samtools", "mpileup", "-f", tmpFile.Name(), "-")

// Goroutine for the first command: samtools view
g.Go(func() error {
defer viewSortWriter.Close() // ensure the pipe is closed after this function exits

viewCmd.Stdin = samAlignments
viewCmd.Stdout = viewSortWriter

if err := viewCmd.Start(); err != nil {
return err
}

select {
case <-ctx.Done():
viewCmd.Process.Signal(syscall.SIGTERM)
return ctx.Err()
default:
return viewCmd.Wait()
}
})

// Goroutine for the second command: samtools sort
g.Go(func() error {
defer sortMpileupWriter.Close() // ensure the pipe is closed after this function exits

sortCmd.Stdin = viewSortReader
sortCmd.Stdout = sortMpileupWriter

if err := sortCmd.Start(); err != nil {
return err
}

select {
case <-ctx.Done():
sortCmd.Process.Signal(syscall.SIGTERM)
return ctx.Err()
default:
return sortCmd.Wait()
}
})

// Goroutine for the third command: samtools mpileup
g.Go(func() error {
mpileupCmd.Stdin = sortMpileupReader
mpileupCmd.Stdout = w

if err := mpileupCmd.Start(); err != nil {
return err
}

select {
case <-ctx.Done():
mpileupCmd.Process.Signal(syscall.SIGTERM)
return ctx.Err()
default:
return mpileupCmd.Wait()
}
})

// Wait for all goroutines to complete and return the first non-nil error
if err := g.Wait(); err != nil {
return err
}

return nil
}
47 changes: 47 additions & 0 deletions external/samtools/samtools_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package samtools_test

import (
"bytes"
"os"
"testing"

"github.com/koeng101/dnadesign/external/samtools"
"github.com/koeng101/dnadesign/lib/bio"
)

func TestPileup(t *testing.T) {
// Open the template FASTA file
templateFile, err := os.Open("./data/template.fasta")
if err != nil {
t.Fatalf("Failed to open template FASTA file: %v", err)
}
defer templateFile.Close()

// Open the sam file
samFile, err := os.Open("./data/aln.sam")
if err != nil {
t.Fatalf("Failed to open sam alignment file: %v", err)
}
defer samFile.Close()

// Prepare the writer to capture the output
var buf bytes.Buffer

// Execute the pileup function
err = samtools.Pileup(templateFile, samFile, &buf)
if err != nil {
t.Errorf("Pileup returned error: %s", err)
}

// Read as pileup file
parser := bio.NewPileupParser(&buf)
lines, err := parser.Parse()
if err != nil {
t.Errorf("Failed while parsing: %s", err)
}

expectedQuality := "3555457667367556657568659884340:7"
if lines[5].Quality != expectedQuality {
t.Errorf("Bad quality return. Got: %s Expected: %s", lines[5].Quality, expectedQuality)
}
}
39 changes: 27 additions & 12 deletions lib/bio/bio.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/koeng101/dnadesign/lib/bio/fastq"
"github.com/koeng101/dnadesign/lib/bio/genbank"
"github.com/koeng101/dnadesign/lib/bio/pileup"
"github.com/koeng101/dnadesign/lib/bio/sam"
"github.com/koeng101/dnadesign/lib/bio/slow5"
"github.com/koeng101/dnadesign/lib/bio/uniprot"
"golang.org/x/sync/errgroup"
Expand All @@ -33,6 +34,7 @@ const (
Fastq
Genbank
Slow5
Sam
Pileup
)

Expand All @@ -48,6 +50,7 @@ var DefaultMaxLengths = map[Format]int{
Fastq: 8 * 1024 * 1024, // The longest single nanopore sequencing read so far is 4Mb. A 8mb buffer should be large enough for any sequencing.
Genbank: defaultMaxLineLength,
Slow5: 128 * 1024 * 1024, // 128mb is used because slow5 lines can be massive, since a single read can be many millions of base pairs.
Sam: defaultMaxLineLength,
Pileup: defaultMaxLineLength,
}

Expand Down Expand Up @@ -89,36 +92,36 @@ type Parser[Data io.WriterTo, Header io.WriterTo] struct {
}

// NewFastaParser initiates a new FASTA parser from an io.Reader.
func NewFastaParser(r io.Reader) (*Parser[*fasta.Record, *fasta.Header], error) {
func NewFastaParser(r io.Reader) *Parser[*fasta.Record, *fasta.Header] {
return NewFastaParserWithMaxLineLength(r, DefaultMaxLengths[Fasta])
}

// NewFastaParserWithMaxLineLength initiates a new FASTA parser from an
// io.Reader and a user-given maxLineLength.
func NewFastaParserWithMaxLineLength(r io.Reader, maxLineLength int) (*Parser[*fasta.Record, *fasta.Header], error) {
return &Parser[*fasta.Record, *fasta.Header]{parserInterface: fasta.NewParser(r, maxLineLength)}, nil
func NewFastaParserWithMaxLineLength(r io.Reader, maxLineLength int) *Parser[*fasta.Record, *fasta.Header] {
return &Parser[*fasta.Record, *fasta.Header]{parserInterface: fasta.NewParser(r, maxLineLength)}
}

// NewFastqParser initiates a new FASTQ parser from an io.Reader.
func NewFastqParser(r io.Reader) (*Parser[*fastq.Read, *fastq.Header], error) {
func NewFastqParser(r io.Reader) *Parser[*fastq.Read, *fastq.Header] {
return NewFastqParserWithMaxLineLength(r, DefaultMaxLengths[Fastq])
}

// NewFastqParserWithMaxLineLength initiates a new FASTQ parser from an
// io.Reader and a user-given maxLineLength.
func NewFastqParserWithMaxLineLength(r io.Reader, maxLineLength int) (*Parser[*fastq.Read, *fastq.Header], error) {
return &Parser[*fastq.Read, *fastq.Header]{parserInterface: fastq.NewParser(r, maxLineLength)}, nil
func NewFastqParserWithMaxLineLength(r io.Reader, maxLineLength int) *Parser[*fastq.Read, *fastq.Header] {
return &Parser[*fastq.Read, *fastq.Header]{parserInterface: fastq.NewParser(r, maxLineLength)}
}

// NewGenbankParser initiates a new Genbank parser form an io.Reader.
func NewGenbankParser(r io.Reader) (*Parser[*genbank.Genbank, *genbank.Header], error) {
func NewGenbankParser(r io.Reader) *Parser[*genbank.Genbank, *genbank.Header] {
return NewGenbankParserWithMaxLineLength(r, DefaultMaxLengths[Genbank])
}

// NewGenbankParserWithMaxLineLength initiates a new Genbank parser from an
// io.Reader and a user-given maxLineLength.
func NewGenbankParserWithMaxLineLength(r io.Reader, maxLineLength int) (*Parser[*genbank.Genbank, *genbank.Header], error) {
return &Parser[*genbank.Genbank, *genbank.Header]{parserInterface: genbank.NewParser(r, maxLineLength)}, nil
func NewGenbankParserWithMaxLineLength(r io.Reader, maxLineLength int) *Parser[*genbank.Genbank, *genbank.Header] {
return &Parser[*genbank.Genbank, *genbank.Header]{parserInterface: genbank.NewParser(r, maxLineLength)}
}

// NewSlow5Parser initiates a new SLOW5 parser from an io.Reader.
Expand All @@ -133,15 +136,27 @@ func NewSlow5ParserWithMaxLineLength(r io.Reader, maxLineLength int) (*Parser[*s
return &Parser[*slow5.Read, *slow5.Header]{parserInterface: parser}, err
}

// NewSamParser initiates a new SAM parser from an io.Reader.
func NewSamParser(r io.Reader) (*Parser[*sam.Alignment, *sam.Header], error) {
return NewSamParserWithMaxLineLength(r, DefaultMaxLengths[Sam])
}

// NewSamParserWithMaxLineLength initiates a new SAM parser from an io.Reader
// and a user-given maxLineLength.
func NewSamParserWithMaxLineLength(r io.Reader, maxLineLength int) (*Parser[*sam.Alignment, *sam.Header], error) {
parser, _, err := sam.NewParser(r, maxLineLength)
return &Parser[*sam.Alignment, *sam.Header]{parserInterface: parser}, err
}

// NewPileupParser initiates a new Pileup parser from an io.Reader.
func NewPileupParser(r io.Reader) (*Parser[*pileup.Line, *pileup.Header], error) {
func NewPileupParser(r io.Reader) *Parser[*pileup.Line, *pileup.Header] {
return NewPileupParserWithMaxLineLength(r, DefaultMaxLengths[Pileup])
}

// NewPileupParserWithMaxLineLength initiates a new Pileup parser from an
// io.Reader and a user-given maxLineLength.
func NewPileupParserWithMaxLineLength(r io.Reader, maxLineLength int) (*Parser[*pileup.Line, *pileup.Header], error) {
return &Parser[*pileup.Line, *pileup.Header]{parserInterface: pileup.NewParser(r, maxLineLength)}, nil
func NewPileupParserWithMaxLineLength(r io.Reader, maxLineLength int) *Parser[*pileup.Line, *pileup.Header] {
return &Parser[*pileup.Line, *pileup.Header]{parserInterface: pileup.NewParser(r, maxLineLength)}
}

// NewUniprotParser initiates a new Uniprot parser from an io.Reader. No
Expand Down
Loading

0 comments on commit 01b183e

Please sign in to comment.