Skip to content
This repository has been archived by the owner on Oct 29, 2023. It is now read-only.

Commit

Permalink
Merge pull request #136 from deflaux/master
Browse files Browse the repository at this point in the history
Bump utils-java version.
  • Loading branch information
ssgross committed Aug 18, 2015
2 parents a7c3f6e + 1712279 commit d41201e
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 5 deletions.
9 changes: 7 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@
<dependency>
<groupId>com.google.cloud.genomics</groupId>
<artifactId>google-genomics-utils</artifactId>
<version>v1beta2-0.32</version>
<version>v1beta2-0.33</version>
<exclusions>
<!-- Exclude an old version of guava which is being pulled
in by a transitive dependency google-api-client 1.19.0 -->
Expand All @@ -148,6 +148,11 @@
<artifactId>gov.nist.math.jama</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
<version>3.2</version>
</dependency>
<dependency>
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
Expand Down Expand Up @@ -203,7 +208,7 @@
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.0.0-alpha-3</version>
</dependency>
</dependency>
</dependencies>

<profiles>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,19 @@
import java.security.GeneralSecurityException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;

import com.google.cloud.dataflow.sdk.transforms.Aggregator;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.Max;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.Sum;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.genomics.utils.GenomicsFactory;
import com.google.cloud.genomics.utils.ShardBoundary;
import com.google.cloud.genomics.utils.grpc.ReadStreamIterator;
import com.google.common.base.Stopwatch;
import com.google.genomics.v1.Read;
import com.google.genomics.v1.StreamReadsRequest;
import com.google.genomics.v1.StreamReadsResponse;
Expand Down Expand Up @@ -65,20 +68,26 @@ private class RetrieveReads extends DoFn<StreamReadsRequest, List<Read>> {

protected Aggregator<Integer, Integer> initializedShardCount;
protected Aggregator<Integer, Integer> finishedShardCount;
protected Aggregator<Long, Long> shardTimeMaxSec;

public RetrieveReads() {
initializedShardCount = createAggregator("Initialized Shard Count", new Sum.SumIntegerFn());
finishedShardCount = createAggregator("Finished Shard Count", new Sum.SumIntegerFn());
shardTimeMaxSec = createAggregator("Maximum Shard Processing Time (sec)", new Max.MaxLongFn());
}

@Override
public void processElement(ProcessContext c) throws IOException, GeneralSecurityException {
initializedShardCount.addValue(1);
shardTimeMaxSec.addValue(0L);
Stopwatch stopWatch = Stopwatch.createStarted();
Iterator<StreamReadsResponse> iter = new ReadStreamIterator(c.element(), auth, shardBoundary, fields);
while (iter.hasNext()) {
StreamReadsResponse readResponse = iter.next();
c.output(readResponse.getAlignmentsList());
}
stopWatch.stop();
shardTimeMaxSec.addValue(stopWatch.elapsed(TimeUnit.SECONDS));
finishedShardCount.addValue(1);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,23 @@
import java.security.GeneralSecurityException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.cloud.dataflow.sdk.transforms.Aggregator;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.Max;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.Sum;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.genomics.utils.GenomicsFactory;
import com.google.cloud.genomics.utils.ShardBoundary;
import com.google.cloud.genomics.utils.grpc.VariantStreamIterator;
import com.google.common.base.Stopwatch;
import com.google.genomics.v1.StreamVariantsRequest;
import com.google.genomics.v1.StreamVariantsResponse;
import com.google.genomics.v1.Variant;
Expand All @@ -37,6 +44,7 @@
public class VariantStreamer extends
PTransform<PCollection<StreamVariantsRequest>, PCollection<Variant>> {

private static final Logger LOG = LoggerFactory.getLogger(VariantStreamer.class);
protected final GenomicsFactory.OfflineAuth auth;
protected final ShardBoundary.Requirement shardBoundary;
protected final String fields;
Expand All @@ -63,25 +71,33 @@ public PCollection<Variant> apply(PCollection<StreamVariantsRequest> input) {
private class RetrieveVariants extends DoFn<StreamVariantsRequest, List<Variant>> {

protected Aggregator<Integer, Integer> initializedShardCount;
protected Aggregator<Long, Long> itemCount;
protected Aggregator<Integer, Integer> finishedShardCount;
protected Aggregator<Long, Long> shardTimeMaxSec;
DescriptiveStatistics stats;

public RetrieveVariants() {
initializedShardCount = createAggregator("Initialized Shard Count", new Sum.SumIntegerFn());
itemCount = createAggregator("Number of variant lists", new Sum.SumLongFn());
finishedShardCount = createAggregator("Finished Shard Count", new Sum.SumIntegerFn());
shardTimeMaxSec = createAggregator("Maximum Shard Processing Time (sec)", new Max.MaxLongFn());
stats = new DescriptiveStatistics(500);
}

@Override
public void processElement(ProcessContext c) throws IOException, GeneralSecurityException, InterruptedException {
initializedShardCount.addValue(1);
shardTimeMaxSec.addValue(0L);
Stopwatch stopWatch = Stopwatch.createStarted();
Iterator<StreamVariantsResponse> iter = new VariantStreamIterator(c.element(), auth, shardBoundary, fields);
while (iter.hasNext()) {
StreamVariantsResponse variantResponse = iter.next();
c.output(variantResponse.getVariantsList());
itemCount.addValue(1L);
}
stopWatch.stop();
shardTimeMaxSec.addValue(stopWatch.elapsed(TimeUnit.SECONDS));
stats.addValue(stopWatch.elapsed(TimeUnit.SECONDS));
finishedShardCount.addValue(1);
LOG.info("Shard Duration in Seconds - Min: " + stats.getMin() + " Max: " + stats.getMax() +
" Avg: " + stats.getMean() + " StdDev: " + stats.getStandardDeviation());
}
}

Expand Down

0 comments on commit d41201e

Please sign in to comment.