Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,19 @@ import kafka.api._
import kafka.common.{ErrorMapping, OffsetAndMetadata, OffsetMetadataAndError, TopicAndPartition}
import kafka.consumer.{ConsumerConfig, SimpleConsumer}
import org.apache.spark.SparkException
import org.apache.spark.annotation.DeveloperApi

/**
* :: DeveloperApi ::
* Convenience methods for interacting with a Kafka cluster.
* See <a href="https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol">
* A Guide To The Kafka Protocol</a> for more details on individual api calls.
* @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
* configuration parameters</a>.
* Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
* NOT zookeeper servers, specified in host1:port1,host2:port2 form
*/
private[spark]
@DeveloperApi
class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
import KafkaCluster.{Err, LeaderOffset, SimpleConsumerConfig}

Expand Down Expand Up @@ -224,7 +228,7 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
// this 0 here indicates api version, in this case the original ZK backed api.
private def defaultConsumerApiVersion: Short = 0

/** Requires Kafka >= 0.8.1.1 */
/** Requires Kafka >= 0.8.1.1. Defaults to the original ZooKeeper backed api version. */
def getConsumerOffsets(
groupId: String,
topicAndPartitions: Set[TopicAndPartition]
Expand All @@ -243,7 +247,7 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
}
}

/** Requires Kafka >= 0.8.1.1 */
/** Requires Kafka >= 0.8.1.1. Defaults to the original ZooKeeper backed api version. */
def getConsumerOffsetMetadata(
groupId: String,
topicAndPartitions: Set[TopicAndPartition]
Expand Down Expand Up @@ -280,7 +284,7 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
Left(errs)
}

/** Requires Kafka >= 0.8.1.1 */
/** Requires Kafka >= 0.8.1.1. Defaults to the original ZooKeeper backed api version. */
def setConsumerOffsets(
groupId: String,
offsets: Map[TopicAndPartition, Long]
Expand All @@ -298,7 +302,7 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
setConsumerOffsetMetadata(groupId, meta, consumerApiVersion)
}

/** Requires Kafka >= 0.8.1.1 */
/** Requires Kafka >= 0.8.1.1. Defaults to the original ZooKeeper backed api version. */
def setConsumerOffsetMetadata(
groupId: String,
metadata: Map[TopicAndPartition, OffsetAndMetadata]
Expand Down Expand Up @@ -356,7 +360,7 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
}
}

private[spark]
@DeveloperApi
object KafkaCluster {
type Err = ArrayBuffer[Throwable]

Expand All @@ -368,15 +372,13 @@ object KafkaCluster {
)
}

private[spark]
case class LeaderOffset(host: String, port: Int, offset: Long)

/**
* High-level kafka consumers connect to ZK. ConsumerConfig assumes this use case.
* Simple consumers connect directly to brokers, but need many of the same configs.
* This subclass won't warn about missing ZK params, or presence of broker params.
*/
private[spark]
class SimpleConsumerConfig private(brokers: String, originalProps: Properties)
extends ConsumerConfig(originalProps) {
val seedBrokers: Array[(String, Int)] = brokers.split(",").map { hp =>
Expand All @@ -388,7 +390,6 @@ object KafkaCluster {
}
}

private[spark]
object SimpleConsumerConfig {
/**
* Make a consumer config without requiring group.id or zookeeper.connect,
Expand Down