Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rickyhuo.enhance.kafka #178

Merged
merged 3 commits into from
Nov 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@ assemblyMergeStrategy in assembly := {
case "META-INF/ECLIPSEF.RSA" => MergeStrategy.last
case "META-INF/mailcap" => MergeStrategy.last
case "META-INF/mimetypes.default" => MergeStrategy.last
case "SimpleLog.class" => MergeStrategy.last
case PathList(ps @ _*) if ps.last endsWith ".html" => MergeStrategy.first
case "UnusedStubClass.class" => MergeStrategy.last
case PathList(ps @ _*) if ps.last endsWith ".class" => MergeStrategy.first
case x =>
val oldStrategy = (assemblyMergeStrategy in assembly).value
oldStrategy(x)
Expand Down
2 changes: 1 addition & 1 deletion docs/en/configuration/_sidebar.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
- [Fake](/en/configuration/input-plugins/Fake)
- [File](/en/configuration/input-plugins/File)
- [Hdfs](/en/configuration/input-plugins/Hdfs)
- [Kafka](/en/configuration/input-plugins/Kafka)
- [KafkaStream](/en/configuration/input-plugins/KafkaStream)
- [S3](/en/configuration/input-plugins/S3)
- [Socket](/en/configuration/input-plugins/Socket)
- [Filter Plugin](/en/configuration/filter-plugin)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ This plugin uses Kafka Old Consumer. Supporting Kafka >= 0.8.2.X
| --- | --- | --- | --- |
| [topics](#topics-string) | string | yes | - |
| [consumer.group.id](#consumergroupid-string) | string | yes | - |
| [consumer.zookeeper.connect](#consumerzookeeperconnect-string) | string | yes | - |
| [consumer.bootstrap.servers](#consumerbootstrapservers-string) | string | yes | - |
| [consumer.*](#consumer-string) | string | no | - |

Expand All @@ -29,10 +28,6 @@ Kafka topic. Multiple topics separated by commas. For example, "tpc1,tpc2".

Kafka consumer group id, a unique string that identifies the consumer group this consumer belongs to.

##### consumer.zookeeper.connect [string]

Specifies the ZooKeeper connection string in the form `hostname:port` where host and port are the host and port of a ZooKeeper server

##### consumer.bootstrap.servers [string]

A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.This string should be in the form `host1:port1,host2:port2,.... `
Expand All @@ -50,7 +45,6 @@ The way to specify parameters is to use the prefix "consumer" before the paramet
kafka {
topics = "waterdrop"
consumer.bootstrap.servers = "localhost:9092"
consumer.zookeeper.connect = "localhost:2181"
consumer.group.id = "waterdrop_group"
consumer.rebalance.max.retries = 100
}
Expand Down
6 changes: 0 additions & 6 deletions docs/zh-cn/configuration/input-plugins/KafkaStream.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ Kafka Input实现Kafka的Old Consumer客户端, 从Kafka消费数据。支持的
| --- | --- | --- | --- |
| [topics](#topics-string) | string | yes | - |
| [consumer.group.id](#consumergroupid-string) | string | yes | - |
| [consumer.zookeeper.connect](#consumerzookeeperconnect-string) | string | yes | - |
| [consumer.bootstrap.servers](#consumerbootstrapservers-string) | string | yes | - |
| [consumer.*](#consumer-string) | string | no | - |

Expand All @@ -27,10 +26,6 @@ Kafka topic名称。如果有多个topic,用","分割,例如: "tpc1,tpc2"。

Kafka consumer group id,用于区分不同的消费组。

##### consumer.zookeeper.connect [string]

Kafka集群的Zookeeper地址

##### consumer.bootstrap.servers [string]

Kafka集群地址,多个用","隔开
Expand All @@ -47,7 +42,6 @@ Kafka集群地址,多个用","隔开
kafkaStream {
topics = "waterdrop"
consumer.bootstrap.servers = "localhost:9092"
consumer.zookeeper.connect = "localhost:2181"
consumer.group.id = "waterdrop_group"
consumer.rebalance.max.retries = 100
}
Expand Down
3 changes: 2 additions & 1 deletion waterdrop-core/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ providedDeps match {
libraryDependencies ++= Seq(

"org.apache.spark" %% "spark-streaming-kafka-0-10" % sparkVersion
exclude("org.spark-project.spark", "unused"),
exclude("org.spark-project.spark", "unused")
exclude("net.jpountz.lz4", "unused"),
"com.typesafe" % "config" % "1.3.1",
"com.alibaba" % "QLExpress" % "3.2.0",
"com.alibaba" % "fastjson" % "1.2.47",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,11 @@ class KafkaStream extends BaseStreamingInput {
config.hasPath("topics") match {
case true => {
val consumerConfig = config.getConfig(consumerPrefix)
consumerConfig.hasPath("zookeeper.connect") &&
!consumerConfig.getString("zookeeper.connect").trim.isEmpty &&
consumerConfig.hasPath("group.id") &&
consumerConfig.hasPath("group.id") &&
!consumerConfig.getString("group.id").trim.isEmpty match {
case true => (true, "")
case false =>
(false, "please specify [consumer.zookeeper.connect] and [consumer.group.id] as non-empty string")
(false, "please specify [consumer.group.id] as non-empty string")
}
}
case false => (false, "please specify [topics] as non-empty string, multiple topics separated by \",\"")
Expand Down