Skip to content

Conversation

@markgrover
Copy link
Member

First off, many thanks and credit to @nikit-os for starting this work. I am building up on his good work done at #10681.

Here is some context: Kafka has a new consumer API starting Apache Kafka 0.9 (Kafka 0.9.0.0, to be precise). This API supports security features like authentication (via Kerberos) and encryption (via SSL). This new API is in beta but is the only way we can integrate Spark and Kafka in a secure environment.

Kafka 0.9 ships with both the new and old consumer API. So, we can up our dependency of Kafka from 0.8* to 0.9. However, the old consumer API in Kafka 0.9 is binary incompatible with the old consumer API in Kafka 0.8_. There are also some minor API compatibility issues between the old API of Kafka 0.8._ and Kafka 0.9, mostly around ZkClient and ZkUtils API. In almost all cases, when users are using the old API whether they are using Kafka 0.8 or Kafka 0.9 with Spark, they won't have to modify any code on their side. It's only when they use the new consumer API from Kafka 0.9, they will have to modify their code.

In addition to supporting to the new API from Kafka 0.9, we can support the old API from Kafka 0.8 only (which works with Kafka 0.8 and Kafka 0.9 brokers), or old API from Kafka 0.9 (which only works with Kafka 0.9 brokers). If we support the old API from 0.8.0, we complicate our integration. We will have to build 2 artifacts (kafka-assembly jars, that is), one for Kafka 0.8 and Kafka 0.9. We'll have to do maven profiles, one for Kafka 0.8 and one for 0.9. Alternatively, if we only support old API from 0.9, users will have upgrade Kafka brokers to 0.9, when using Spark 2.0.

In fact, users should ideally upgrade Kafka brokers before they upgrade Spark since old 0.8.0 clients can work with the new 0.9.0 brokers). I am personally ok with requiring users to upgrade Kafka brokers to 0.9.0 since 1) this is going in Spark 2.0, 2) that's what the Kafka community is pushing for, 3) I think the baggage of supporting Kafka 0.8's old consumer API might be too much to carry until another major release of Spark. Again, with Kafka 0.9's old consumer API, which we support, no user code needs to be changed, they do need to upgrade their brokers though.

As far as the code in this PR goes, we decided to put code for the new API in a new package (called org.apache.spark.streaming.kafka.newapi*), creating a separate maven artifact spark-streaming-kafka-newapi_2.10.

In essence, this means that there are two KafkaCluster classes and two KafkaUtils classes, etc. (similar to how we have NewHadoopRDD) because there wasn't enough duplication between the two implementations. Where there was enough duplication, example KafkaTestUtils, there is only one implementation.

I'd love to get this reviewed. Thanks in advance!

The examples of the new API have some extra required parameters, I will change it shortly for them to have reasonable defaults and be optional.

Follow on work:

  • This is only scala/java implementation. I am working on and currently testing a python implementation in a separate branch at https://github.com/markgrover/spark/tree/kafka09-integration-python I want to get this PR out and committed first. It'll also help me gather feedback sooner than later.
  • Add and test authentication with kerberos. For that, we need delegation tokens in Kafka (which is currently unresolved: https://issues.apache.org/jira/browse/KAFKA-1696). This is because, especially in the case of DirectStream, all executors can consume from Kafka brokers and initializing credentials at each executor can been seen as a DoS attack by the Ticket Granting server.
  • Encryption exists in this patch but I haven't spent much time integration testing it. If folks prefer, taking that out until I have gotten a chance to integration test it on my own cluster, I am open to that. It's only a few lines and a test that needs to be taken out.

nikit-os and others added 20 commits January 10, 2016 15:23
Getting rid of an extra dependency
…ad of v09 to avoid confusion. Python work is being done in a separate branch github.com/markgrover/spark/tree/kafka09-integration-python
@markgrover
Copy link
Member Author

@tdas @harishreedharan @koeninger would appreciate your review here. Thanks!

@SparkQA
Copy link

SparkQA commented Jan 27, 2016

Test build #50220 has finished for PR 10953 at commit b35ea65.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 27, 2016

Test build #50224 has finished for PR 10953 at commit b3f163e.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@markgrover
Copy link
Member Author

OK, I have no idea what Mima is but I will take a look and try to run them locally and fix the issues.

@vanzin
Copy link
Contributor

vanzin commented Jan 27, 2016

MiMA is a binary compatibility checker. It's complaining that some changes you made caused the public APIs exposed in the compiled classes to change - meaning existing code compiled against the current Spark version might not run on the next Spark.

First I'd look at whether those changes are necessary; if they are, it might be ok to add exclusions because we're being a bit lenient with API breakages in 2.0.

@markgrover
Copy link
Member Author

Thanks Marcelo, I added excludes because I think it's ok to have such minor binary incompatibility in this case (Kafka 0.8->Kafka 0.9, Spark 1.6->Spark 2.0).

@SparkQA
Copy link

SparkQA commented Jan 29, 2016

Test build #50332 has finished for PR 10953 at commit 41a8816.

  • This patch passes all tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

* NOT zookeeper servers, specified in host1:port1,host2:port2 form
*/
private[spark]
class NewKafkaCluster[K: ClassTag, V: ClassTag](val kafkaParams: Map[String, String])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is another place where we can't use 'new' in the class name . See #9007 which recently got merged. We have to make this a public class because of valid use-cases like https://issues.apache.org/jira/browse/SPARK-13106

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, it did conflict with this PR but I have merged it. I will make the NewKafkaCluster class public as well, similar to this change now.

@SparkQA
Copy link

SparkQA commented Feb 9, 2016

Test build #50946 has finished for PR 10953 at commit 846a85a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 11, 2016

Test build #51125 has finished for PR 10953 at commit 9983e7d.

  • This patch fails build dependency tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 12, 2016

Test build #51193 has finished for PR 10953 at commit c422cd5.

  • This patch fails build dependency tests.
  • This patch does not merge cleanly.
  • This patch adds the following public classes (experimental):
    • class KMeansModel(JavaModel, MLWritable, MLReadable):
    • class KMeans(JavaEstimator, HasFeaturesCol, HasPredictionCol, HasMaxIter, HasTol, HasSeed,
    • class ALS(JavaEstimator, HasCheckpointInterval, HasMaxIter, HasPredictionCol, HasRegParam, HasSeed,
    • class ALSModel(JavaModel, MLWritable, MLReadable):
    • case class Grouping(child: Expression) extends Expression with Unevaluable
    • case class GroupingID(groupByExprs: Seq[Expression]) extends Expression with Unevaluable
    • case class AssertNotNull(child: Expression, walkedTypePath: Seq[String])
    • case class ReturnAnswer(child: LogicalPlan) extends UnaryNode
    • public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBase<InternalRow>
    • class ContinuousQueryManager(sqlContext: SQLContext)
    • case class CollectLimit(limit: Int, child: SparkPlan) extends UnaryNode
    • trait BaseLimit extends UnaryNode
    • case class LocalLimit(limit: Int, child: SparkPlan) extends BaseLimit
    • case class GlobalLimit(limit: Int, child: SparkPlan) extends BaseLimit
    • case class TakeOrderedAndProject(
    • class ContinuousQueryListenerBus(sparkListenerBus: LiveListenerBus)
    • class FileStreamSource(
    • trait HadoopFsRelationProvider extends StreamSourceProvider
    • abstract class ContinuousQueryListener

Conflicts:
	project/MimaExcludes.scala
@SparkQA
Copy link

SparkQA commented Mar 2, 2016

Test build #52280 has finished for PR 10953 at commit 075226e.

  • This patch fails build dependency tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@markgrover
Copy link
Member Author

Will fix shortly. I was building against a custom built version of kafka
because 0.9.1 wasn't out which had some blocker fixes, to make tests pass.
On Mar 1, 2016 6:00 PM, "UCB AMPLab" notifications@github.com wrote:

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/52280/
Test FAILed.


Reply to this email directly or view it on GitHub
#10953 (comment).

@SparkQA
Copy link

SparkQA commented Mar 2, 2016

Test build #52322 has finished for PR 10953 at commit 993c6fd.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@markgrover
Copy link
Member Author

Hmm, this is weird, these tests were passing before. Anyways, I was able to circumvent them by setting SPARK_DRIVER_MEMORY to a higher value on my local box. I'll kick off another test run to see if the failure was transient. And, if not, I'd have to see if a bump is necessary here too, or why is it necessary now.

@markgrover
Copy link
Member Author

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Mar 3, 2016

Test build #52352 has finished for PR 10953 at commit 993c6fd.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 3, 2016

Test build #52353 has finished for PR 10953 at commit 229b773.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

@markgrover Mind adding Closes #10681 in the PR description so that merging script can close that together?

@markgrover markgrover changed the title [SPARK-12177] [STREAMING] Update KafkaDStreams to new Kafka 0.9 Consumer API [SPARK-12177] [STREAMING] Update KafkaDStreams to new Kafka 0.9 Consumer API (Closes #10681) May 6, 2016
@Xaerxess
Copy link

Kafka 0.10.0 is out, any plans of merging this PR or updating it with newest Kafka?

@koeninger
Copy link
Contributor

I think this PR is fairly out of date regarding things like needing to cache consumers on executors, getPreferredLocations, and a separate subproject for kafka-0.8

@markgrover
Copy link
Member Author

Yeah, I agree with @koeninger. This PR is pretty out of date, it makes sense to turn focus on Cody's PR #11863

@markgrover markgrover closed this Jun 13, 2016
oalam pushed a commit to Hurence/logisland that referenced this pull request Jul 6, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

10 participants