Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
*/
package org.apache.spark.streaming.flume.sink

import java.util.concurrent.{CountDownLatch, ConcurrentHashMap, Executors}
import java.util.concurrent.{CountDownLatch, Executors}
import java.util.concurrent.atomic.AtomicLong

import scala.collection.JavaConversions._
import scala.collection.mutable

import org.apache.flume.Channel
import org.apache.commons.lang.RandomStringUtils
Expand Down Expand Up @@ -47,8 +47,8 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
val transactionExecutorOpt = Option(Executors.newFixedThreadPool(threads,
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Spark Sink Processor Thread - %d").build()))
private val sequenceNumberToProcessor =
new ConcurrentHashMap[CharSequence, TransactionProcessor]()
// Protected by `sequenceNumberToProcessor`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could use the @GuardedBy("sequenceNumberToProcessor") javax annotation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

external/flume-sink doesn't depend on jsr305. GuardedBy is in jsr305***.jar. However, great to know that we can use GuardedBy in Spark.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, did not realize it was not a javax standard. Thanks.

private val sequenceNumberToProcessor = mutable.HashMap[CharSequence, TransactionProcessor]()
// This sink will not persist sequence numbers and reuses them if it gets restarted.
// So it is possible to commit a transaction which may have been meant for the sink before the
// restart.
Expand All @@ -58,8 +58,8 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
private val seqBase = RandomStringUtils.randomAlphanumeric(8)
private val seqCounter = new AtomicLong(0)


@volatile private var stopped = false
// Protected by `sequenceNumberToProcessor`
private var stopped = false

@volatile private var isTest = false
private var testLatch: CountDownLatch = null
Expand Down Expand Up @@ -131,18 +131,19 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
* @param success Whether the batch was successful or not.
*/
private def completeTransaction(sequenceNumber: CharSequence, success: Boolean) {
Option(removeAndGetProcessor(sequenceNumber)).foreach(processor => {
removeAndGetProcessor(sequenceNumber).foreach(processor => {
processor.batchProcessed(success)
})
}

/**
* Helper method to remove the TxnProcessor for a Sequence Number. Can be used to avoid a leak.
* @param sequenceNumber
* @return The transaction processor for the corresponding batch. Note that this instance is no
* longer tracked and the caller is responsible for that txn processor.
* @return An `Option` of the transaction processor for the corresponding batch. Note that this
* instance is no longer tracked and the caller is responsible for that txn processor.
*/
private[sink] def removeAndGetProcessor(sequenceNumber: CharSequence): TransactionProcessor = {
private[sink] def removeAndGetProcessor(sequenceNumber: CharSequence):
Option[TransactionProcessor] = {
sequenceNumberToProcessor.synchronized {
sequenceNumberToProcessor.remove(sequenceNumber.toString)
}
Expand All @@ -160,7 +161,7 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
logInfo("Shutting down Spark Avro Callback Handler")
sequenceNumberToProcessor.synchronized {
stopped = true
sequenceNumberToProcessor.values().foreach(_.shutdown())
sequenceNumberToProcessor.values.foreach(_.shutdown())
}
transactionExecutorOpt.foreach(_.shutdownNow())
}
Expand Down