diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
index 8465432c5850..382ef3af264d 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
@@ -26,15 +26,19 @@ import kafka.api._
import kafka.common.{ErrorMapping, OffsetAndMetadata, OffsetMetadataAndError, TopicAndPartition}
import kafka.consumer.{ConsumerConfig, SimpleConsumer}
import org.apache.spark.SparkException
+import org.apache.spark.annotation.DeveloperApi
/**
+ * :: DeveloperApi ::
* Convenience methods for interacting with a Kafka cluster.
+ * See
+ * A Guide To The Kafka Protocol for more details on individual api calls.
* @param kafkaParams Kafka
* configuration parameters.
* Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
* NOT zookeeper servers, specified in host1:port1,host2:port2 form
*/
-private[spark]
+@DeveloperApi
class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
import KafkaCluster.{Err, LeaderOffset, SimpleConsumerConfig}
@@ -224,7 +228,7 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
// this 0 here indicates api version, in this case the original ZK backed api.
private def defaultConsumerApiVersion: Short = 0
- /** Requires Kafka >= 0.8.1.1 */
+ /** Requires Kafka >= 0.8.1.1. Defaults to the original ZooKeeper backed api version. */
def getConsumerOffsets(
groupId: String,
topicAndPartitions: Set[TopicAndPartition]
@@ -243,7 +247,7 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
}
}
- /** Requires Kafka >= 0.8.1.1 */
+ /** Requires Kafka >= 0.8.1.1. Defaults to the original ZooKeeper backed api version. */
def getConsumerOffsetMetadata(
groupId: String,
topicAndPartitions: Set[TopicAndPartition]
@@ -280,7 +284,7 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
Left(errs)
}
- /** Requires Kafka >= 0.8.1.1 */
+ /** Requires Kafka >= 0.8.1.1. Defaults to the original ZooKeeper backed api version. */
def setConsumerOffsets(
groupId: String,
offsets: Map[TopicAndPartition, Long]
@@ -298,7 +302,7 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
setConsumerOffsetMetadata(groupId, meta, consumerApiVersion)
}
- /** Requires Kafka >= 0.8.1.1 */
+ /** Requires Kafka >= 0.8.1.1. Defaults to the original ZooKeeper backed api version. */
def setConsumerOffsetMetadata(
groupId: String,
metadata: Map[TopicAndPartition, OffsetAndMetadata]
@@ -356,7 +360,7 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
}
}
-private[spark]
+@DeveloperApi
object KafkaCluster {
type Err = ArrayBuffer[Throwable]
@@ -368,7 +372,6 @@ object KafkaCluster {
)
}
- private[spark]
case class LeaderOffset(host: String, port: Int, offset: Long)
/**
@@ -376,7 +379,6 @@ object KafkaCluster {
* Simple consumers connect directly to brokers, but need many of the same configs.
* This subclass won't warn about missing ZK params, or presence of broker params.
*/
- private[spark]
class SimpleConsumerConfig private(brokers: String, originalProps: Properties)
extends ConsumerConfig(originalProps) {
val seedBrokers: Array[(String, Int)] = brokers.split(",").map { hp =>
@@ -388,7 +390,6 @@ object KafkaCluster {
}
}
- private[spark]
object SimpleConsumerConfig {
/**
* Make a consumer config without requiring group.id or zookeeper.connect,