diff --git a/pom.xml b/pom.xml
index d166644..1bb3004 100644
--- a/pom.xml
+++ b/pom.xml
@@ -127,7 +127,7 @@
com.google.cloud.genomics
google-genomics-utils
- v1beta2-0.32
+ v1beta2-0.33
@@ -148,6 +148,11 @@
gov.nist.math.jama
1.1.1
+
+ org.apache.commons
+ commons-math3
+ 3.2
+
org.reflections
reflections
@@ -203,7 +208,7 @@
com.google.protobuf
protobuf-java
3.0.0-alpha-3
-
+
diff --git a/src/main/java/com/google/cloud/genomics/dataflow/readers/ReadStreamer.java b/src/main/java/com/google/cloud/genomics/dataflow/readers/ReadStreamer.java
index 9dc2b95..dcf3643 100644
--- a/src/main/java/com/google/cloud/genomics/dataflow/readers/ReadStreamer.java
+++ b/src/main/java/com/google/cloud/genomics/dataflow/readers/ReadStreamer.java
@@ -17,9 +17,11 @@
import java.security.GeneralSecurityException;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import com.google.cloud.dataflow.sdk.transforms.Aggregator;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.Max;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.Sum;
@@ -27,6 +29,7 @@
import com.google.cloud.genomics.utils.GenomicsFactory;
import com.google.cloud.genomics.utils.ShardBoundary;
import com.google.cloud.genomics.utils.grpc.ReadStreamIterator;
+import com.google.common.base.Stopwatch;
import com.google.genomics.v1.Read;
import com.google.genomics.v1.StreamReadsRequest;
import com.google.genomics.v1.StreamReadsResponse;
@@ -65,20 +68,26 @@ private class RetrieveReads extends DoFn> {
protected Aggregator initializedShardCount;
protected Aggregator finishedShardCount;
+ protected Aggregator shardTimeMaxSec;
public RetrieveReads() {
initializedShardCount = createAggregator("Initialized Shard Count", new Sum.SumIntegerFn());
finishedShardCount = createAggregator("Finished Shard Count", new Sum.SumIntegerFn());
+ shardTimeMaxSec = createAggregator("Maximum Shard Processing Time (sec)", new Max.MaxLongFn());
}
@Override
public void processElement(ProcessContext c) throws IOException, GeneralSecurityException {
initializedShardCount.addValue(1);
+ shardTimeMaxSec.addValue(0L);
+ Stopwatch stopWatch = Stopwatch.createStarted();
Iterator iter = new ReadStreamIterator(c.element(), auth, shardBoundary, fields);
while (iter.hasNext()) {
StreamReadsResponse readResponse = iter.next();
c.output(readResponse.getAlignmentsList());
}
+ stopWatch.stop();
+ shardTimeMaxSec.addValue(stopWatch.elapsed(TimeUnit.SECONDS));
finishedShardCount.addValue(1);
}
}
diff --git a/src/main/java/com/google/cloud/genomics/dataflow/readers/VariantStreamer.java b/src/main/java/com/google/cloud/genomics/dataflow/readers/VariantStreamer.java
index fcf8ba2..94ec6fa 100644
--- a/src/main/java/com/google/cloud/genomics/dataflow/readers/VariantStreamer.java
+++ b/src/main/java/com/google/cloud/genomics/dataflow/readers/VariantStreamer.java
@@ -17,9 +17,15 @@
import java.security.GeneralSecurityException;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.cloud.dataflow.sdk.transforms.Aggregator;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.Max;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.Sum;
@@ -27,6 +33,7 @@
import com.google.cloud.genomics.utils.GenomicsFactory;
import com.google.cloud.genomics.utils.ShardBoundary;
import com.google.cloud.genomics.utils.grpc.VariantStreamIterator;
+import com.google.common.base.Stopwatch;
import com.google.genomics.v1.StreamVariantsRequest;
import com.google.genomics.v1.StreamVariantsResponse;
import com.google.genomics.v1.Variant;
@@ -37,6 +44,7 @@
public class VariantStreamer extends
PTransform, PCollection> {
+ private static final Logger LOG = LoggerFactory.getLogger(VariantStreamer.class);
protected final GenomicsFactory.OfflineAuth auth;
protected final ShardBoundary.Requirement shardBoundary;
protected final String fields;
@@ -63,25 +71,33 @@ public PCollection apply(PCollection input) {
private class RetrieveVariants extends DoFn> {
protected Aggregator initializedShardCount;
- protected Aggregator itemCount;
protected Aggregator finishedShardCount;
+ protected Aggregator shardTimeMaxSec;
+ DescriptiveStatistics stats;
public RetrieveVariants() {
initializedShardCount = createAggregator("Initialized Shard Count", new Sum.SumIntegerFn());
- itemCount = createAggregator("Number of variant lists", new Sum.SumLongFn());
finishedShardCount = createAggregator("Finished Shard Count", new Sum.SumIntegerFn());
+ shardTimeMaxSec = createAggregator("Maximum Shard Processing Time (sec)", new Max.MaxLongFn());
+ stats = new DescriptiveStatistics(500);
}
@Override
public void processElement(ProcessContext c) throws IOException, GeneralSecurityException, InterruptedException {
initializedShardCount.addValue(1);
+ shardTimeMaxSec.addValue(0L);
+ Stopwatch stopWatch = Stopwatch.createStarted();
Iterator iter = new VariantStreamIterator(c.element(), auth, shardBoundary, fields);
while (iter.hasNext()) {
StreamVariantsResponse variantResponse = iter.next();
c.output(variantResponse.getVariantsList());
- itemCount.addValue(1L);
}
+ stopWatch.stop();
+ shardTimeMaxSec.addValue(stopWatch.elapsed(TimeUnit.SECONDS));
+ stats.addValue(stopWatch.elapsed(TimeUnit.SECONDS));
finishedShardCount.addValue(1);
+ LOG.info("Shard Duration in Seconds - Min: " + stats.getMin() + " Max: " + stats.getMax() +
+ " Avg: " + stats.getMean() + " StdDev: " + stats.getStandardDeviation());
}
}