Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging {
* Set the rate limit to `newRate`. The new rate will not exceed the maximum rate configured by
* {{{spark.streaming.receiver.maxRate}}}, even if `newRate` is higher than that.
*
* @param newRate A new rate in events per second. It has no effect if it's 0 or negative.
* @param newRate A new rate in records per second. It has no effect if it's 0 or negative.
*/
private[receiver] def updateRate(newRate: Long): Unit =
if (newRate > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ private[ui] abstract class BatchTableBase(tableId: String, batchInterval: Long)
protected def baseRow(batch: BatchUIData): Seq[Node] = {
val batchTime = batch.batchTime.milliseconds
val formattedBatchTime = UIUtils.formatBatchTime(batchTime, batchInterval)
val eventCount = batch.numRecords
val numRecords = batch.numRecords
val schedulingDelay = batch.schedulingDelay
val formattedSchedulingDelay = schedulingDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
val processingTime = batch.processingDelay
Expand All @@ -65,7 +65,7 @@ private[ui] abstract class BatchTableBase(tableId: String, batchInterval: Long)
{formattedBatchTime}
</a>
</td>
<td sorttable_customkey={eventCount.toString}>{eventCount.toString} events</td>
<td sorttable_customkey={numRecords.toString}>{numRecords.toString} records</td>
<td sorttable_customkey={schedulingDelay.getOrElse(Long.MaxValue).toString}>
{formattedSchedulingDelay}
</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,21 +202,21 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
def streamIds: Seq[Int] = ssc.graph.getInputStreams().map(_.id)

/**
* Return all of the event rates for each InputDStream in each batch. The key of the return value
* is the stream id, and the value is a sequence of batch time with its event rate.
* Return all of the record rates for each InputDStream in each batch. The key of the return value
* is the stream id, and the value is a sequence of batch time with its record rate.
*/
def receivedEventRateWithBatchTime: Map[Int, Seq[(Long, Double)]] = synchronized {
def receivedRecordRateWithBatchTime: Map[Int, Seq[(Long, Double)]] = synchronized {
val _retainedBatches = retainedBatches
val latestBatches = _retainedBatches.map { batchUIData =>
(batchUIData.batchTime.milliseconds, batchUIData.streamIdToInputInfo.mapValues(_.numRecords))
}
streamIds.map { streamId =>
val eventRates = latestBatches.map {
val recordRates = latestBatches.map {
case (batchTime, streamIdToNumRecords) =>
val numRecords = streamIdToNumRecords.getOrElse(streamId, 0L)
(batchTime, numRecords * 1000.0 / batchDuration)
}
(streamId, eventRates)
(streamId, recordRates)
}.toMap
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,9 @@ private[ui] class MillisecondsStatUIData(data: Seq[(Long, Long)]) {
* A helper class for "input rate" to generate data that will be used in the timeline and histogram
* graphs.
*
* @param data (batchTime, event-rate).
* @param data (batch time, record rate).
*/
private[ui] class EventRateUIData(val data: Seq[(Long, Double)]) {
private[ui] class RecordRateUIData(val data: Seq[(Long, Double)]) {

val avg: Option[Double] = if (data.isEmpty) None else Some(data.map(_._2).sum / data.size)

Expand Down Expand Up @@ -215,7 +215,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
val minBatchTime = if (batchTimes.isEmpty) startTime else batchTimes.min
val maxBatchTime = if (batchTimes.isEmpty) startTime else batchTimes.max

val eventRateForAllStreams = new EventRateUIData(batches.map { batchInfo =>
val recordRateForAllStreams = new RecordRateUIData(batches.map { batchInfo =>
(batchInfo.batchTime.milliseconds, batchInfo.numRecords * 1000.0 / listener.batchDuration)
})

Expand All @@ -241,24 +241,24 @@ private[ui] class StreamingPage(parent: StreamingTab)

// Use the max input rate for all InputDStreams' graphs to make the Y axis ranges same.
// If it's not an integral number, just use its ceil integral number.
val maxEventRate = eventRateForAllStreams.max.map(_.ceil.toLong).getOrElse(0L)
val minEventRate = 0L
val maxRecordRate = recordRateForAllStreams.max.map(_.ceil.toLong).getOrElse(0L)
val minRecordRate = 0L

val batchInterval = UIUtils.convertToTimeUnit(listener.batchDuration, normalizedUnit)

val jsCollector = new JsCollector

val graphUIDataForEventRateOfAllStreams =
val graphUIDataForRecordRateOfAllStreams =
new GraphUIData(
"all-stream-events-timeline",
"all-stream-events-histogram",
eventRateForAllStreams.data,
"all-stream-records-timeline",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other changes seem OK as they're to labels and private classes and internal code; this changes a div ID and I'm not clear if something else has to change in the code to match? like will something else fail because it doesn't find all-stream-events-timeline? I don't see any other references though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen thank you for the detailed review!

Yeah normally this requires something else to change, but GraphUIData has been implemented well enough to save the trouble. We only need to change this "all-stream-events-timeline" here, then the div ID as well as any other place referring to this div will change accordingly; please see the snapshots of the HTML source:

div as a placeholder:
records-2-html-1

then fill in that div with data:
records-2-html-2

Thanks!

"all-stream-records-histogram",
recordRateForAllStreams.data,
minBatchTime,
maxBatchTime,
minEventRate,
maxEventRate,
"events/sec")
graphUIDataForEventRateOfAllStreams.generateDataJs(jsCollector)
minRecordRate,
maxRecordRate,
"records/sec")
graphUIDataForRecordRateOfAllStreams.generateDataJs(jsCollector)

val graphUIDataForSchedulingDelay =
new GraphUIData(
Expand Down Expand Up @@ -334,16 +334,16 @@ private[ui] class StreamingPage(parent: StreamingTab)
<div>Receivers: {listener.numActiveReceivers} / {numReceivers} active</div>
}
}
<div>Avg: {eventRateForAllStreams.formattedAvg} events/sec</div>
<div>Avg: {recordRateForAllStreams.formattedAvg} records/sec</div>
</div>
</td>
<td class="timeline">{graphUIDataForEventRateOfAllStreams.generateTimelineHtml(jsCollector)}</td>
<td class="histogram">{graphUIDataForEventRateOfAllStreams.generateHistogramHtml(jsCollector)}</td>
<td class="timeline">{graphUIDataForRecordRateOfAllStreams.generateTimelineHtml(jsCollector)}</td>
<td class="histogram">{graphUIDataForRecordRateOfAllStreams.generateHistogramHtml(jsCollector)}</td>
</tr>
{if (hasStream) {
<tr id="inputs-table" style="display: none;" >
<td colspan="3">
{generateInputDStreamsTable(jsCollector, minBatchTime, maxBatchTime, minEventRate, maxEventRate)}
{generateInputDStreamsTable(jsCollector, minBatchTime, maxBatchTime, minRecordRate, maxRecordRate)}
</td>
</tr>
}}
Expand Down Expand Up @@ -390,15 +390,16 @@ private[ui] class StreamingPage(parent: StreamingTab)
maxX: Long,
minY: Double,
maxY: Double): Seq[Node] = {
val maxYCalculated = listener.receivedEventRateWithBatchTime.values
.flatMap { case streamAndRates => streamAndRates.map { case (_, eventRate) => eventRate } }
val maxYCalculated = listener.receivedRecordRateWithBatchTime.values
.flatMap { case streamAndRates => streamAndRates.map { case (_, recordRate) => recordRate } }
.reduceOption[Double](math.max)
.map(_.ceil.toLong)
.getOrElse(0L)

val content = listener.receivedEventRateWithBatchTime.toList.sortBy(_._1).map {
case (streamId, eventRates) =>
generateInputDStreamRow(jsCollector, streamId, eventRates, minX, maxX, minY, maxYCalculated)
val content = listener.receivedRecordRateWithBatchTime.toList.sortBy(_._1).map {
case (streamId, recordRates) =>
generateInputDStreamRow(
jsCollector, streamId, recordRates, minX, maxX, minY, maxYCalculated)
}.foldLeft[Seq[Node]](Nil)(_ ++ _)

// scalastyle:off
Expand All @@ -422,7 +423,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
private def generateInputDStreamRow(
jsCollector: JsCollector,
streamId: Int,
eventRates: Seq[(Long, Double)],
recordRates: Seq[(Long, Double)],
minX: Long,
maxX: Long,
minY: Double,
Expand All @@ -447,25 +448,25 @@ private[ui] class StreamingPage(parent: StreamingTab)
val receiverLastErrorTime = receiverInfo.map {
r => if (r.lastErrorTime < 0) "-" else SparkUIUtils.formatDate(r.lastErrorTime)
}.getOrElse(emptyCell)
val receivedRecords = new EventRateUIData(eventRates)
val receivedRecords = new RecordRateUIData(recordRates)

val graphUIDataForEventRate =
val graphUIDataForRecordRate =
new GraphUIData(
s"stream-$streamId-events-timeline",
s"stream-$streamId-events-histogram",
s"stream-$streamId-records-timeline",
s"stream-$streamId-records-histogram",
receivedRecords.data,
minX,
maxX,
minY,
maxY,
"events/sec")
graphUIDataForEventRate.generateDataJs(jsCollector)
"records/sec")
graphUIDataForRecordRate.generateDataJs(jsCollector)

<tr>
<td rowspan="2" style="vertical-align: middle; width: 151px;">
<div style="width: 151px;">
<div style="word-wrap: break-word;"><strong>{receiverName}</strong></div>
<div>Avg: {receivedRecords.formattedAvg} events/sec</div>
<div>Avg: {receivedRecords.formattedAvg} records/sec</div>
</div>
</td>
<td>{receiverActive}</td>
Expand All @@ -475,9 +476,9 @@ private[ui] class StreamingPage(parent: StreamingTab)
</tr>
<tr>
<td colspan="3" class="timeline">
{graphUIDataForEventRate.generateTimelineHtml(jsCollector)}
{graphUIDataForRecordRate.generateTimelineHtml(jsCollector)}
</td>
<td class="histogram">{graphUIDataForEventRate.generateHistogramHtml(jsCollector)}</td>
<td class="histogram">{graphUIDataForRecordRate.generateHistogramHtml(jsCollector)}</td>
</tr>
}

Expand Down