diff --git a/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala b/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala index 960354d8..525606a1 100644 --- a/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala +++ b/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala @@ -391,15 +391,10 @@ object Configs { hiveEntryOpt = Option(hiveEntry) } - var hasKafka = false - val tags = mutable.ListBuffer[TagConfigEntry]() val tagConfigs = getConfigsOrNone(config, "tags") if (tagConfigs.isDefined) { for (tagConfig <- tagConfigs.get.asScala) { - if (hasKafka) { - throw new IllegalArgumentException("Can not define any other configs when kafka exists") - } if (!tagConfig.hasPath("name") || !tagConfig.hasPath("type.source") || !tagConfig.hasPath("type.sink")) { @@ -436,7 +431,6 @@ object Configs { val sourceConfig = dataSourceConfig(sourceCategory, tagConfig, nebulaConfig, variable, paths) LOG.info(s"Source Config ${sourceConfig}") - hasKafka = sourceCategory == SourceCategory.KAFKA val sinkCategory = toSinkCategory(tagConfig.getString("type.sink")) val sinkConfig = dataSinkConfig(sinkCategory, nebulaConfig) @@ -494,9 +488,6 @@ object Configs { val edgeConfigs = getConfigsOrNone(config, "edges") if (edgeConfigs.isDefined) { for (edgeConfig <- edgeConfigs.get.asScala) { - if (hasKafka) { - throw new IllegalArgumentException("Can not define any other configs when kafka exists") - } if (!edgeConfig.hasPath("name") || !edgeConfig.hasPath("type.source") || !edgeConfig.hasPath("type.sink")) { @@ -520,7 +511,6 @@ object Configs { val sourceConfig = dataSourceConfig(sourceCategory, edgeConfig, nebulaConfig, variable, paths) LOG.info(s"Source Config ${sourceConfig}") - hasKafka = sourceCategory == SourceCategory.KAFKA val sinkCategory = toSinkCategory(edgeConfig.getString("type.sink")) val sinkConfig = dataSinkConfig(sinkCategory, nebulaConfig) @@ -863,12 +853,29 @@ object Configs { val maxOffsetsPerTrigger = if (config.hasPath("maxOffsetsPerTrigger")) Some(config.getLong("maxOffsetsPerTrigger")) else None - KafkaSourceConfigEntry(SourceCategory.KAFKA, - intervalSeconds, - config.getString("service"), - config.getString("topic"), - startingOffsets, - maxOffsetsPerTrigger) + + val securityProtocol = + if (config.hasPath("securityProtocol")) Some(config.getString("securityProtocol")) + else None + val mechanism = + if (config.hasPath("mechanism")) Some(config.getString("mechanism")) else None + val kerberos = if (config.hasPath("kerberos")) config.getBoolean("kerberos") else false + val kerberosServiceName = + if (config.hasPath("kerberosServiceName")) config.getString("kerberosServiceName") + else null + + KafkaSourceConfigEntry( + SourceCategory.KAFKA, + intervalSeconds, + config.getString("service"), + config.getString("topic"), + startingOffsets, + maxOffsetsPerTrigger, + securityProtocol, + mechanism, + kerberos, + kerberosServiceName + ) case SourceCategory.PULSAR => val options = config.getObject("options").unwrapped.asScala.map(x => x._1 -> x._2.toString).toMap diff --git a/exchange-common/src/main/scala/com/vesoft/exchange/common/config/SourceConfigs.scala b/exchange-common/src/main/scala/com/vesoft/exchange/common/config/SourceConfigs.scala index d16c7e64..2844cafe 100644 --- a/exchange-common/src/main/scala/com/vesoft/exchange/common/config/SourceConfigs.scala +++ b/exchange-common/src/main/scala/com/vesoft/exchange/common/config/SourceConfigs.scala @@ -213,9 +213,17 @@ case class KafkaSourceConfigEntry(override val category: SourceCategory.Value, server: String, topic: String, startingOffsets: String, - maxOffsetsPerTrigger: Option[Long] = None) + maxOffsetsPerTrigger: Option[Long] = None, + securityProtocol: Option[String] = None, + mechanism: Option[String] = None, + kerberos: Boolean = false, + kerberosServiceName: String = null) extends StreamingDataSourceConfigEntry { - require(server.trim.nonEmpty && topic.trim.nonEmpty) + require(server.trim.nonEmpty && topic.trim.nonEmpty, "server or topic cannot be empty") + require(securityProtocol.isEmpty || mechanism.isDefined, + "security protocol is defined, mechanism must be config.") + require(!kerberos || kerberosServiceName.nonEmpty, + "kerberos is true, service name must be config") override def toString: String = { s"Kafka source server: ${server} topic:${topic} startingOffsets:${startingOffsets} maxOffsetsPerTrigger:${maxOffsetsPerTrigger}" diff --git a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/reader/StreamingBaseReader.scala b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/reader/StreamingBaseReader.scala index a3640698..3b6db592 100644 --- a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/reader/StreamingBaseReader.scala +++ b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/reader/StreamingBaseReader.scala @@ -46,6 +46,14 @@ class KafkaReader(override val session: SparkSession, .option("subscribe", kafkaConfig.topic) .option("startingOffsets", kafkaConfig.startingOffsets) + if (kafkaConfig.securityProtocol.isDefined) { + reader.option("kafka.security.protocol", kafkaConfig.securityProtocol.get) + reader.option("kafka.sasl.mechanism", kafkaConfig.mechanism.get) + } + if (kafkaConfig.kerberos) { + reader.option("kafka.sasl.kerberos.service.name", kafkaConfig.kerberosServiceName) + } + val maxOffsetsPerTrigger = kafkaConfig.maxOffsetsPerTrigger if (maxOffsetsPerTrigger.isDefined) reader.option("maxOffsetsPerTrigger", maxOffsetsPerTrigger.get) diff --git a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/reader/StreamingBaseReader.scala b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/reader/StreamingBaseReader.scala index a3640698..02bfd357 100644 --- a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/reader/StreamingBaseReader.scala +++ b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/reader/StreamingBaseReader.scala @@ -9,6 +9,8 @@ import com.vesoft.exchange.common.config.{KafkaSourceConfigEntry, PulsarSourceCo import org.apache.spark.sql.types.StringType import org.apache.spark.sql.{DataFrame, SparkSession} +import scala.collection.mutable + /** * Spark Streaming * @@ -46,6 +48,13 @@ class KafkaReader(override val session: SparkSession, .option("subscribe", kafkaConfig.topic) .option("startingOffsets", kafkaConfig.startingOffsets) + if(kafkaConfig.securityProtocol.isDefined){ + reader.option("kafka.security.protocol", kafkaConfig.securityProtocol.get) + reader.option("kafka.sasl.mechanism", kafkaConfig.mechanism.get) + } + if(kafkaConfig.kerberos){ + reader.option("kafka.sasl.kerberos.service.name", kafkaConfig.kerberosServiceName) + } val maxOffsetsPerTrigger = kafkaConfig.maxOffsetsPerTrigger if (maxOffsetsPerTrigger.isDefined) reader.option("maxOffsetsPerTrigger", maxOffsetsPerTrigger.get) diff --git a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/reader/StreamingBaseReader.scala b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/reader/StreamingBaseReader.scala index a3640698..3b6db592 100644 --- a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/reader/StreamingBaseReader.scala +++ b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/reader/StreamingBaseReader.scala @@ -46,6 +46,14 @@ class KafkaReader(override val session: SparkSession, .option("subscribe", kafkaConfig.topic) .option("startingOffsets", kafkaConfig.startingOffsets) + if (kafkaConfig.securityProtocol.isDefined) { + reader.option("kafka.security.protocol", kafkaConfig.securityProtocol.get) + reader.option("kafka.sasl.mechanism", kafkaConfig.mechanism.get) + } + if (kafkaConfig.kerberos) { + reader.option("kafka.sasl.kerberos.service.name", kafkaConfig.kerberosServiceName) + } + val maxOffsetsPerTrigger = kafkaConfig.maxOffsetsPerTrigger if (maxOffsetsPerTrigger.isDefined) reader.option("maxOffsetsPerTrigger", maxOffsetsPerTrigger.get)