Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion project/Build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ object ScaldingBuild extends Build {
val hadoopLzoVersion = "0.4.19"
val hadoopVersion = "2.5.0"
val hbaseVersion = "0.94.10"
val hravenVersion = "0.9.16"
val hravenVersion = "0.9.17.t05"
val jacksonVersion = "2.4.2"
val json4SVersion = "3.2.11"
val paradiseVersion = "2.0.1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,27 +215,19 @@ final case class FlowStepHistory(keys: FlowStepKeys,
reduceShuffleBytes: Long,
cost: Double,
tasks: Seq[Task])

final case class FlowStepKeys(jobName: String,
user: String,
priority: String,
status: String,
version: String,
queue: String)

final case class Task(taskId: String,
final case class Task(
taskType: String,
status: String,
splits: Seq[String],
startTime: Long,
finishTime: Long,
taskAttemptId: String,
trackerName: String,
httpPort: Int,
hostname: String,
state: String,
error: String,
shuffleFinished: Long,
sortFinished: Long)
finishTime: Long)

/**
* Provider of information about prior runs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,10 @@ object HistoryServiceWithData {
val tasks = taskRuntimes.map { time =>
val startTime = random.nextLong
Task(
taskId = "foo",
taskType = "REDUCE",
status = "SUCCEEDED",
splits = Seq(),
startTime = startTime,
finishTime = startTime + time,
taskAttemptId = "foo",
trackerName = "foo",
httpPort = random.nextInt,
hostname = "foo",
state = "foo",
error = "foo",
shuffleFinished = random.nextInt,
sortFinished = random.nextInt)
finishTime = startTime + time)
}

FlowStepHistory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,15 @@ object HRavenHistoryService extends HistoryService {

private val LOG = LoggerFactory.getLogger(this.getClass)

// List of fields that we consume from fetchTaskDetails api.
// This is sent to hraven service to filter the response data
// and avoid hitting http content length limit on hraven side.
private val TaskDetailFields = List(
"taskType",
"status",
"startTime",
"finishTime").asJava

val RequiredJobConfigs = Seq("cascading.flow.step.num")

case class MissingFieldsException(fields: Seq[String]) extends Exception
Expand Down Expand Up @@ -84,7 +93,7 @@ object HRavenHistoryService extends HistoryService {
flow.getJobs.asScala.foreach { job =>

// client.fetchTaskDetails might throw IOException
val tasks = client.fetchTaskDetails(flow.getCluster, job.getJobId)
val tasks = client.fetchTaskDetails(flow.getCluster, job.getJobId, TaskDetailFields)
job.addTasks(tasks)
}
}
Expand Down Expand Up @@ -165,10 +174,33 @@ object HRavenHistoryService extends HistoryService {
for {
step <- history
keys = FlowStepKeys(step.getJobName, step.getUser, step.getPriority, step.getStatus, step.getVersion, "")
tasks = step.getTasks.asScala.map{ t => Task(t.getTaskId, t.getType, t.getStatus, t.getSplits.toSeq, t.getStartTime, t.getFinishTime, t.getTaskAttemptId, t.getTrackerName, t.getHttpPort, t.getHostname, t.getState, t.getError, t.getShuffleFinished, t.getSortFinished) }
} yield FlowStepHistory(keys, step.getSubmitTime, step.getLaunchTime, step.getFinishTime, step.getTotalMaps, step.getTotalReduces, step.getFinishedMaps, step.getFinishedReduces, step.getFailedMaps, step.getFailedReduces, step.getMapFileBytesRead, step.getMapFileBytesWritten, step.getReduceFileBytesRead, step.getHdfsBytesRead, step.getHdfsBytesWritten, step.getMapSlotMillis, step.getReduceSlotMillis, step.getReduceShuffleBytes, 0, tasks)
// update HRavenHistoryService.TaskDetailFields when consuming additional task fields from hraven below
tasks = step.getTasks.asScala.map { t => Task(t.getType, t.getStatus, t.getStartTime, t.getFinishTime) }
} yield toFlowStepHistory(keys, step, tasks)
}

private def toFlowStepHistory(keys: FlowStepKeys, step: JobDetails, tasks: Seq[Task]) =
FlowStepHistory(
keys = keys,
submitTime = step.getSubmitTime,
launchTime = step.getLaunchTime,
finishTime = step.getFinishTime,
totalMaps = step.getTotalMaps,
totalReduces = step.getTotalReduces,
finishedMaps = step.getFinishedMaps,
finishedReduces = step.getFinishedReduces,
failedMaps = step.getFailedMaps,
failedReduces = step.getFailedReduces,
mapFileBytesRead = step.getMapFileBytesRead,
mapFileBytesWritten = step.getMapFileBytesWritten,
reduceFileBytesRead = step.getReduceFileBytesRead,
hdfsBytesRead = step.getHdfsBytesRead,
hdfsBytesWritten = step.getHdfsBytesWritten,
mapperTimeMillis = step.getMapSlotMillis,
reducerTimeMillis = step.getReduceSlotMillis,
reduceShuffleBytes = step.getReduceShuffleBytes,
cost = 0,
tasks = tasks)
}

class HRavenRatioBasedEstimator extends RatioBasedEstimator {
Expand Down