Skip to content

Commit

Permalink
Adding PulsarAdmin configs to PulsarHelper (#158)
Browse files Browse the repository at this point in the history
* comment

* adding adminopts

* removing exception

* using moduleparams

* scalastyle

* readme

* retrigger

* adding test

* adding deleted tests back
  • Loading branch information
ericm-db authored Aug 31, 2023
1 parent cd93f8d commit 3f2e904
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 10 deletions.
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,17 @@ You can use `org.apache.spark.sql.pulsar.JsonUtils.topicOffsets(Map[String, Mess
Please check [Pulsar Client Configuration](https://pulsar.apache.org/docs/2.11.x/client-libraries-java/#client) for more details </td>
</tr>

<tr>
<td>`pulsar.admin.*`</td>
<td>Pulsar Admin configurations</td>
<td>No</td>
<td>None</td>
<td>Streaming and Batch</td>
<td>Admin configurations. Example: "pulsar.admin.tlsAllowInsecureConnection".

Please check [Pulsar Admin Configuration](https://pulsar.apache.org/docs/2.10.x/admin-api-overview/) for more details </td>
</tr>

<tr>
<td>`pulsar.reader.*`</td>
<td>Pulsar Reader configurations</td>
Expand Down
12 changes: 9 additions & 3 deletions src/main/scala/org/apache/spark/sql/pulsar/PulsarHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import org.apache.spark.sql.types.StructType
private[pulsar] case class PulsarHelper(
serviceUrl: String,
adminUrl: Option[String],
adminConf: ju.Map[String, Object],
clientConf: ju.Map[String, Object],
driverGroupIdPrefix: String,
caseInsensitiveParameters: Map[String, String],
Expand All @@ -66,7 +67,8 @@ private[pulsar] case class PulsarHelper(
// will only be called if latestOffset is called and there should
// be an exception thrown in PulsarProvider if maxBytes is set,
// and adminUrl is not set
private lazy val admissionControlHelper = new PulsarAdmissionControlHelper(adminUrl.get)
private lazy val admissionControlHelper = new
PulsarAdmissionControlHelper(adminUrl.get, adminConf)

override def close(): Unit = {
// do nothing
Expand Down Expand Up @@ -136,6 +138,9 @@ private[pulsar] case class PulsarHelper(
try {
val (subscription, _) = extractSubscription(predefinedSubscription, tp)
val consumer = CachedConsumer.getOrCreate(tp, subscription, client)
// We need to do this because the consumer does not attempt to
// reconnect after calling .seek().
// TODO: Remove this once we have upgraded to a version so that this is no longer needed
if (!consumer.isConnected) consumer.getLastMessageId
consumer.seek(mid)
} catch {
Expand Down Expand Up @@ -517,10 +522,11 @@ private[pulsar] case class PulsarHelper(
}
}

class PulsarAdmissionControlHelper(adminUrl: String)
class PulsarAdmissionControlHelper(adminUrl: String, conf: ju.Map[String, Object])
extends Logging {

private lazy val pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(adminUrl).build()
private lazy val pulsarAdmin =
PulsarAdmin.builder().serviceHttpUrl(adminUrl).loadConf(conf).build()

import scala.collection.JavaConverters._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ private[pulsar] object PulsarOptions {

// option key prefix for different modules
val PulsarClientOptionKeyPrefix: String = "pulsar.client."
val PulsarAdminOptionKeyPrefix: String = "pulsar.admin."
val PulsarProducerOptionKeyPrefix: String = "pulsar.producer."
val PulsarReaderOptionKeyPrefix: String = "pulsar.reader."

Expand Down
23 changes: 19 additions & 4 deletions src/main/scala/org/apache/spark/sql/pulsar/PulsarProvider.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,15 @@ private[pulsar] class PulsarProvider
parameters: Map[String, String]): (String, StructType) = {

val caseInsensitiveParams = validateStreamOptions(parameters)
val (clientConfig, _, serviceUrlConfig, adminUrl) = prepareConfForReader(parameters)
val (clientConfig, _, adminConfig,
serviceUrlConfig, adminUrl) = prepareConfForReader(parameters)

val subscriptionNamePrefix = s"spark-pulsar-${UUID.randomUUID}"
val inferredSchema = Utils.tryWithResource(
PulsarHelper(
serviceUrlConfig,
adminUrl,
adminConfig,
clientConfig,
subscriptionNamePrefix,
caseInsensitiveParams,
Expand All @@ -85,14 +87,17 @@ private[pulsar] class PulsarProvider
logDebug(s"Creating Pulsar source: $parameters")

val caseInsensitiveParams = validateStreamOptions(parameters)
val (clientConfig, readerConfig, serviceUrl, adminUrl) = prepareConfForReader(parameters)
val (clientConfig, readerConfig,
adminConfig, serviceUrl, adminUrl) = prepareConfForReader(parameters)

logDebug(
s"Client config: $clientConfig; Reader config: $readerConfig; Service URL: $serviceUrl")

val subscriptionNamePrefix = getSubscriptionPrefix(parameters)
val pulsarHelper = PulsarHelper(
serviceUrl,
adminUrl,
adminConfig,
clientConfig,
subscriptionNamePrefix,
caseInsensitiveParams,
Expand Down Expand Up @@ -134,11 +139,14 @@ private[pulsar] class PulsarProvider

val subscriptionNamePrefix = getSubscriptionPrefix(parameters, isBatch = true)

val (clientConfig, readerConfig, serviceUrl, adminUrl) = prepareConfForReader(parameters)
val (clientConfig, readerConfig,
adminConfig, serviceUrl, adminUrl) = prepareConfForReader(parameters)

val (start, end, schema, pSchema) = Utils.tryWithResource(
PulsarHelper(
serviceUrl,
adminUrl,
adminConfig,
clientConfig,
subscriptionNamePrefix,
caseInsensitiveParams,
Expand Down Expand Up @@ -258,6 +266,10 @@ private[pulsar] object PulsarProvider extends Logging {
}
}

private def getAdminParams(parameters: Map[String, String]): Map[String, String] = {
getModuleParams(parameters, PulsarAdminOptionKeyPrefix, clientConfKeys)
}

private def getProducerParams(parameters: Map[String, String]): Map[String, String] = {
getModuleParams(parameters, PulsarProducerOptionKeyPrefix, producerConfKeys)
}
Expand Down Expand Up @@ -507,17 +519,20 @@ private[pulsar] object PulsarProvider extends Logging {
}

private def prepareConfForReader(parameters: Map[String, String])
: (ju.Map[String, Object], ju.Map[String, Object], String, Option[String]) = {
: (ju.Map[String, Object], ju.Map[String, Object],
ju.Map[String, Object], String, Option[String]) = {

val serviceUrl = getServiceUrl(parameters)
val adminUrl = getAdminUrl(parameters)
var clientParams = getClientParams(parameters)
clientParams += (ServiceUrlOptionKey -> serviceUrl)
val readerParams = getReaderParams(parameters)
val adminParams = getAdminParams(parameters)

(
paramsToPulsarConf("pulsar.client", clientParams),
paramsToPulsarConf("pulsar.reader", readerParams),
paramsToPulsarConf("pulsar.admin", adminParams),
serviceUrl, adminUrl)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.apache.spark.sql.pulsar

import java.{util => ju}

import org.apache.pulsar.client.admin.PulsarAdmin
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.internal.DefaultImplementation
Expand Down Expand Up @@ -33,18 +35,32 @@ class PulsarAdmissionControlSuite extends PulsarSourceTest {
val firstLedger = getLedgerId(firstMid)
val firstEntry = getEntryId(firstMid)
require(getLatestOffsets(Set(topic)).size === 1)
val admissionControlHelper = new PulsarAdmissionControlHelper(adminUrl)
val admissionControlHelper = new PulsarAdmissionControlHelper(adminUrl, new ju.HashMap[String, Object]())
val offset = admissionControlHelper.latestOffsetForTopicPartition(topic, MessageId.earliest, approxSizeOfInt)
assert(getLedgerId(offset) == firstLedger && getEntryId(offset) == firstEntry)
}

test("setting invalid config PulsarAdmin") {
val conf = new ju.HashMap[String, Object]()
conf.put("INVALID", "INVALID")

val topic = newTopic()
// Need to call latestOffsetForTopicPartition so the helper instantiates
// the admin
val admissionControlHelper = new PulsarAdmissionControlHelper(adminUrl, conf)
val e = intercept[RuntimeException] {
admissionControlHelper.latestOffsetForTopicPartition(topic, MessageId.earliest, approxSizeOfInt)
}
assert(e.getMessage.contains("Failed to load config into existing configuration data"))
}

test("Admit entry in the middle of the ledger") {
val topic = newTopic()
val messageIds = sendMessages(topic, Array("1", "2", "3"))
val firstMid = messageIds.head._2
val secondMid = messageIds.apply(1)._2
require(getLatestOffsets(Set(topic)).size === 1)
val admissionControlHelper = new PulsarAdmissionControlHelper(adminUrl)
val admissionControlHelper = new PulsarAdmissionControlHelper(adminUrl, new ju.HashMap[String, Object]())
val offset = admissionControlHelper.latestOffsetForTopicPartition(topic, firstMid, approxSizeOfInt)
assert(getLedgerId(offset) == getLedgerId(secondMid) && getEntryId(offset) == getEntryId(secondMid))

Expand Down Expand Up @@ -75,7 +91,7 @@ class PulsarAdmissionControlSuite extends PulsarSourceTest {
AddPulsarData(Set(topic), 4, 5, 6, 7, 8, 9),
AssertOnQuery { query =>
query.recentProgress.map(microBatch =>
microBatch.numInputRows <= 4
microBatch.numInputRows <= 4
).forall(_ == true)
}
)
Expand Down Expand Up @@ -141,6 +157,27 @@ class PulsarAdmissionControlSuite extends PulsarSourceTest {
)
}

test("Set invalid configuration for PulsarAdmin in stream options") {
val topic = newTopic()
sendMessages(topic, Array("-1"))
require(getLatestOffsets(Set(topic)).size === 1)

val e = intercept[IllegalArgumentException] {
spark.readStream
.format("pulsar")
.option(TopicSingle, topic)
.option(ServiceUrlOptionKey, serviceUrl)
.option(AdminUrlOptionKey, adminUrl)
.option("pulsar.admin.INVALID", "INVALID")
.option(FailOnDataLossOptionKey, "true")
.option(MaxBytesPerTrigger, approxSizeOfInt * 3)
.load()
.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
}
assert(e.getMessage.contains("invalid not supported by pulsar"))
}

test("Admission Control with one topic-partition") {
val topic = newTopic()

Expand Down

0 comments on commit 3f2e904

Please sign in to comment.