Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve] remove admin parameter from sinks and admin options from config #122

Merged
merged 1 commit into from
Apr 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 0 additions & 43 deletions src/main/scala/org/apache/spark/sql/pulsar/AdminUtils.scala

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import org.apache.pulsar.common.naming.TopicName
private[pulsar] object PulsarOptions {

// option key prefix for different modules

val PulsarAdminOptionKeyPrefix: String = "pulsar.admin."
val PulsarClientOptionKeyPrefix: String = "pulsar.client."
val PulsarProducerOptionKeyPrefix: String = "pulsar.producer."
val PulsarReaderOptionKeyPrefix: String = "pulsar.reader."
Expand All @@ -38,7 +36,6 @@ private[pulsar] object PulsarOptions {
val TopicOptionKeys: Set[String] = Set(TopicSingle, TopicMulti, TopicPattern)

val ServiceUrlOptionKey: String = "service.url"
val AdminUrlOptionKey: String = "admin.url"
val StartingOffsetsOptionKey: String = "startingOffsets".toLowerCase(Locale.ROOT)
val StartingTime: String = "startingTime".toLowerCase(Locale.ROOT)
val EndingTime: String = "endingTime".toLowerCase(Locale.ROOT)
Expand Down
23 changes: 6 additions & 17 deletions src/main/scala/org/apache/spark/sql/pulsar/PulsarProvider.scala
Original file line number Diff line number Diff line change
Expand Up @@ -185,16 +185,15 @@ private[pulsar] class PulsarProvider

val caseInsensitiveParams = validateSinkOptions(parameters)

val (clientConfig, producerConfig, topic, adminUrl) = prepareConfForProducer(parameters)
val (clientConfig, producerConfig, topic) = prepareConfForProducer(parameters)
PulsarSinks.validateQuery(data.schema.toAttributes, topic)

PulsarSinks.write(
sqlContext.sparkSession,
data.queryExecution,
clientConfig,
producerConfig,
topic,
adminUrl)
topic)

/**
* This method is suppose to return a relation the data that was written. Currently we haven't
Expand Down Expand Up @@ -227,9 +226,9 @@ private[pulsar] class PulsarProvider

val caseInsensitiveParams = validateSinkOptions(parameters)

val (clientConfig, producerConfig, topic, adminUrl) = prepareConfForProducer(parameters)
val (clientConfig, producerConfig, topic) = prepareConfForProducer(parameters)

new PulsarSink(sqlContext, clientConfig, producerConfig, topic, adminUrl)
new PulsarSink(sqlContext, clientConfig, producerConfig, topic)
}
}

Expand Down Expand Up @@ -370,10 +369,6 @@ private[pulsar] object PulsarProvider extends Logging {
parameters(ServiceUrlOptionKey)
}

private def getAdminUrl(parameters: Map[String, String]): String = {
parameters(AdminUrlOptionKey)
}

private def getAllowDifferentTopicSchemas(parameters: Map[String, String]): Boolean = {
parameters.getOrElse(AllowDifferentTopicSchemas, "false").toBoolean
}
Expand Down Expand Up @@ -481,10 +476,6 @@ private[pulsar] object PulsarProvider extends Logging {
throw new IllegalArgumentException(s"$ServiceUrlOptionKey must be specified")
}

if (!caseInsensitiveParams.contains(AdminUrlOptionKey)) {
throw new IllegalArgumentException(s"$AdminUrlOptionKey must be specified")
}

val topicOptions =
caseInsensitiveParams.filter { case (k, _) => TopicOptionKeys.contains(k) }.toSeq.toMap
if (topicOptions.size > 1 || topicOptions.contains(TopicMulti) || topicOptions.contains(
Expand Down Expand Up @@ -512,10 +503,9 @@ private[pulsar] object PulsarProvider extends Logging {
}

private def prepareConfForProducer(parameters: Map[String, String])
: (ju.Map[String, Object], ju.Map[String, Object], Option[String], String) = {
: (ju.Map[String, Object], ju.Map[String, Object], Option[String]) = {

val serviceUrl = getServiceUrl(parameters)
val adminUrl = getAdminUrl(parameters)

var clientParams = getClientParams(parameters)
clientParams += (ServiceUrlOptionKey -> serviceUrl)
Expand All @@ -526,8 +516,7 @@ private[pulsar] object PulsarProvider extends Logging {
(
paramsToPulsarConf("pulsar.client", clientParams),
paramsToPulsarConf("pulsar.producer", producerParams),
topic,
adminUrl)
topic)
}

private def jsonOptions: JSONOptionsInRead = {
Expand Down
11 changes: 4 additions & 7 deletions src/main/scala/org/apache/spark/sql/pulsar/PulsarSinks.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ private[pulsar] class PulsarSink(
sqlContext: SQLContext,
pulsarClientConf: ju.Map[String, Object],
pulsarProducerConf: ju.Map[String, Object],
topic: Option[String],
adminUrl: String)
topic: Option[String])
extends Sink
with Logging {

Expand All @@ -53,8 +52,7 @@ private[pulsar] class PulsarSink(
data.queryExecution,
pulsarClientConf,
pulsarProducerConf,
topic,
adminUrl)
topic)
latestBatchId = batchId
}
}
Expand Down Expand Up @@ -141,8 +139,7 @@ private[pulsar] object PulsarSinks extends Logging {
queryExecution: QueryExecution,
pulsarClientConf: ju.Map[String, Object],
pulsarProducerConf: ju.Map[String, Object],
topic: Option[String],
adminUrl: String): Unit = {
topic: Option[String]): Unit = {

// validate the schema
val schema = queryExecution.analyzed.output
Expand All @@ -151,7 +148,7 @@ private[pulsar] object PulsarSinks extends Logging {
// execute RDD
queryExecution.toRdd.foreachPartition { iter =>
val writeTask =
new PulsarWriteTask(pulsarClientConf, pulsarProducerConf, topic, schema, adminUrl)
new PulsarWriteTask(pulsarClientConf, pulsarProducerConf, topic, schema)
Utils.tryWithSafeFinally(block = writeTask.execute(iter))(finallyBlock = writeTask.close())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@ private[pulsar] class PulsarWriteTask(
clientConf: ju.Map[String, Object],
producerConf: ju.Map[String, Object],
topic: Option[String],
inputSchema: Seq[Attribute],
adminUrl: String)
extends PulsarRowWriter(inputSchema, clientConf, producerConf, topic, adminUrl) {
inputSchema: Seq[Attribute])
extends PulsarRowWriter(inputSchema, clientConf, producerConf, topic) {

/**
* Writes key value data out to topics.
Expand All @@ -53,13 +52,10 @@ private[pulsar] abstract class PulsarRowWriter(
inputSchema: Seq[Attribute],
clientConf: ju.Map[String, Object],
producerConf: ju.Map[String, Object],
topic: Option[String],
adminUrl: String) {
topic: Option[String]) {

import PulsarOptions._

protected lazy val admin = AdminUtils.buildAdmin(adminUrl, clientConf)

private def createProjections = {
val topicExpression = topic
.map(Literal(_))
Expand Down Expand Up @@ -213,6 +209,5 @@ private[pulsar] abstract class PulsarRowWriter(
protected def producerClose(): Unit = {
producerFlush()
topic2Producer.clear()
admin.close()
}
}