diff --git a/scalding-core/src/main/scala/com/twitter/scalding/reducer_estimation/RuntimeReducerEstimator.scala b/scalding-core/src/main/scala/com/twitter/scalding/reducer_estimation/RuntimeReducerEstimator.scala index 3820886fd5..15bf5a1bd6 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/reducer_estimation/RuntimeReducerEstimator.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/reducer_estimation/RuntimeReducerEstimator.scala @@ -1,9 +1,5 @@ package com.twitter.scalding.reducer_estimation -import scala.collection.JavaConverters._ -import cascading.flow.FlowStep -import cascading.tap.{ Tap, CompositeTap } -import cascading.tap.hadoop.Hfs import org.apache.hadoop.mapred.JobConf import org.slf4j.LoggerFactory @@ -193,15 +189,11 @@ trait RuntimeReducerEstimator extends HistoryReducerEstimator { } object MedianEstimationScheme extends RuntimeEstimationScheme { - import reducer_estimation.{ mean, median } - - def estimateJobTime(times: Seq[Double]) = mean(times) + def estimateJobTime(times: Seq[Double]) = median(times) def estimateTaskTime(times: Seq[Double]) = median(times) } object MeanEstimationScheme extends RuntimeEstimationScheme { - import reducer_estimation.{ mean, median } - def estimateJobTime(times: Seq[Double]) = mean(times) def estimateTaskTime(times: Seq[Double]) = mean(times) } diff --git a/scalding-core/src/main/scala/com/twitter/scalding/reducer_estimation/package.scala b/scalding-core/src/main/scala/com/twitter/scalding/reducer_estimation/package.scala index 1134a08c03..eb998f4071 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/reducer_estimation/package.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/reducer_estimation/package.scala @@ -1,3 +1,5 @@ +package com.twitter.scalding + package object reducer_estimation { def median(xs: Seq[Double]): Option[Double] = xs.sorted.lift(xs.length / 2) def mean(xs: Seq[Double]): Option[Double] = if (xs.isEmpty) None else Some(xs.sum / xs.length) diff --git a/scalding-hadoop-test/src/test/scala/com/twitter/scalding/reducer_estimation/RuntimeReducerEstimatorTest.scala b/scalding-hadoop-test/src/test/scala/com/twitter/scalding/reducer_estimation/RuntimeReducerEstimatorTest.scala index 6e4b50194c..71f460c26a 100644 --- a/scalding-hadoop-test/src/test/scala/com/twitter/scalding/reducer_estimation/RuntimeReducerEstimatorTest.scala +++ b/scalding-hadoop-test/src/test/scala/com/twitter/scalding/reducer_estimation/RuntimeReducerEstimatorTest.scala @@ -5,7 +5,7 @@ import com.twitter.scalding.reducer_estimation.RuntimeReducerEstimator.{ Runtime import com.twitter.scalding.platform.{ HadoopPlatformJobTest, HadoopSharedPlatformTest } import org.scalatest.{ Matchers, WordSpec } import scala.collection.JavaConverters._ -import scala.util.{ Failure, Success, Try } +import scala.util.{ Success, Try } object HistoryService1 extends HistoryServiceWithData { import HistoryServiceWithData._ @@ -35,7 +35,6 @@ class DummyEstimator extends ReducerEstimator { } class RuntimeReducerEstimatorTest extends WordSpec with Matchers with HadoopSharedPlatformTest { - import HipJob._ "Single-step job with runtime-based reducer estimator" should { "set reducers correctly with median estimation scheme" in { @@ -63,7 +62,7 @@ class RuntimeReducerEstimatorTest extends WordSpec with Matchers with HadoopShar // (1200 / inputSize) ms per byte // (1800 / inputSize) ms per byte // - // The mean of these is (1500 / inputSize) ms per byte, + // The median of these is (1500 / inputSize) ms per byte, // so we anticipate that processing (inputSize bytes) // will take 1500 ms total. // To do this in 25 ms, we need 60 reducers. @@ -158,11 +157,11 @@ class RuntimeReducerEstimatorTest extends WordSpec with Matchers with HadoopShar // // We don't scale by input size. // - // The mean of these is 3600 ms, so we anticipate - // that the job will take 3600 ms total. + // The median of these is 3000 ms, so we anticipate + // that the job will take 3000 ms total. // - // To do this in 25 ms, we need 144 reducers. - assert(conf.getNumReduceTasks == 144) + // To do this in 25 ms, we need 120 reducers. + assert(conf.getNumReduceTasks == 120) } .run }