Skip to content

Commit

Permalink
Neng/allow provide subscription (#82)
Browse files Browse the repository at this point in the history
* allow predefined subscription for creating metadata reader

* handle subscription deletion

* checkstyle

* update
  • Loading branch information
nlu90 authored Jun 9, 2022
1 parent 318c13b commit 8db4c9b
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 42 deletions.
110 changes: 71 additions & 39 deletions src/main/scala/org/apache/spark/sql/pulsar/PulsarMetadataReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ private[pulsar] case class PulsarMetadataReader(
adminClientConf: ju.Map[String, Object],
driverGroupIdPrefix: String,
caseInsensitiveParameters: Map[String, String],
allowDifferentTopicSchemas: Boolean)
allowDifferentTopicSchemas: Boolean,
predefinedSubscription: Option[String])
extends Closeable
with Logging {

Expand All @@ -59,31 +60,39 @@ private[pulsar] case class PulsarMetadataReader(

def setupCursor(startingPos: PerTopicOffset): Unit = {
startingPos match {
case off: SpecificPulsarOffset => setupCursorByMid(off)
case time: SpecificPulsarStartingTime => setupCursorByTime(time)
case off: SpecificPulsarOffset => setupCursorByMid(off, predefinedSubscription)
case time: SpecificPulsarStartingTime => setupCursorByTime(time, predefinedSubscription)
case s => throw new UnsupportedOperationException(s"$s shouldn't appear here, a bug occurs.")
}
}

def setupCursorByMid(offset: SpecificPulsarOffset): Unit = {
def setupCursorByMid(offset: SpecificPulsarOffset, subscription: Option[String]): Unit = {
offset.topicOffsets.foreach {
case (tp, mid) =>
val umid = mid.asInstanceOf[UserProvidedMessageId]
try {
admin.topics().createSubscription(tp, s"$driverGroupIdPrefix-$tp", umid.mid)
} catch {
case _: PulsarAdminException.ConflictException =>
log.info("Subscription already exists, resetting the cursor to given offset")
admin.topics().resetCursor(tp, s"$driverGroupIdPrefix-$tp", umid.mid)
case e: Throwable =>
throw new RuntimeException(
s"Failed to setup cursor for ${TopicName.get(tp).toString}",
e)
val (subscriptionName, subscriptionPredefined) = extractSubscription(subscription, tp)

// setup the subscription
if (!subscriptionPredefined) {
try {
admin.topics().createSubscription(tp, subscriptionName, umid.mid)
} catch {
case _: PulsarAdminException.ConflictException =>
// if subscription already exists, log the info and continue to reset cursor
log.info("Subscription already exists...")
case e: Throwable =>
throw new RuntimeException(
s"Failed to setup cursor for ${TopicName.get(tp).toString}", e)
}
}

// reset cursor position
log.info(s"Resetting cursor for $subscriptionName to given offset")
admin.topics().resetCursor(tp, subscriptionName, umid.mid)
}
}

def setupCursorByTime(time: SpecificPulsarStartingTime): Unit = {
def setupCursorByTime(time: SpecificPulsarStartingTime, subscription: Option[String]): Unit = {
time.topicTimes.foreach {
case (tp, time) =>
val msgID = time match {
Expand All @@ -93,30 +102,47 @@ private[pulsar] case class PulsarMetadataReader(
case _ => throw new RuntimeException(s"Invalid starting time for $tp: $time")
}

val (subscriptionNames, subscriptionPredefined) = extractSubscription(subscription, tp)

// setup the subscription
try {
admin.topics().createSubscription(tp, s"$driverGroupIdPrefix-$tp", msgID)
} catch {
case _: PulsarAdminException.ConflictException =>
log.info("subscription already exists, resetting the cursor to given offset")
time match {
case PulsarProvider.EARLIEST_TIME | PulsarProvider.LATEST_TIME =>
admin.topics().resetCursor(tp, s"$driverGroupIdPrefix-$tp", msgID)
case _ =>
admin.topics().resetCursor(tp, s"$driverGroupIdPrefix-$tp", time)
}
case e: Throwable =>
throw new RuntimeException(
s"Failed to setup cursor for ${TopicName.get(tp).toString}", e)
if (!subscriptionPredefined) {
try {
admin.topics().createSubscription(tp, s"$subscriptionNames", msgID)
} catch {
case _: PulsarAdminException.ConflictException =>
// if subscription already exists, log the info and continue to reset cursor
log.info("subscription already exists...")
case e: Throwable =>
throw new RuntimeException(
s"Failed to setup cursor for ${TopicName.get(tp).toString}", e)
}
}

// reset cursor position
log.info(s"Resetting cursor for $subscriptionNames to given timestamp")
time match {
case PulsarProvider.EARLIEST_TIME | PulsarProvider.LATEST_TIME =>
admin.topics().resetCursor(tp, s"$subscriptionNames", msgID)
case _ =>
admin.topics().resetCursor(tp, s"$subscriptionNames", time)
}
}
}

private def extractSubscription(subscriptionName: Option[String],
topicPartition: String): (String, Boolean) = {
subscriptionName match {
case None => (s"$driverGroupIdPrefix-$topicPartition", false)
case Some(subName) => (subName, true)
}
}

def commitCursorToOffset(offset: Map[String, MessageId]): Unit = {
offset.foreach {
case (tp, mid) =>
try {
admin.topics().resetCursor(tp, s"$driverGroupIdPrefix-$tp", mid)
val (subscription, _) = extractSubscription(predefinedSubscription, tp)
admin.topics().resetCursor(tp, s"$subscription", mid)
} catch {
case e: PulsarAdminException if e.getStatusCode == 404 || e.getStatusCode == 412 =>
logInfo(
Expand All @@ -132,15 +158,21 @@ private[pulsar] case class PulsarMetadataReader(
def removeCursor(): Unit = {
getTopics()
topics.foreach { tp =>
try {
admin.topics().deleteSubscription(tp, s"$driverGroupIdPrefix-$tp")
} catch {
case e: PulsarAdminException if e.getStatusCode == 404 =>
logInfo(s"Cannot remove cursor since the topic $tp has been deleted during execution.")
case e: Throwable =>
throw new RuntimeException(
s"Failed to remove cursor for ${TopicName.get(tp).toString}",
e)
val (subscriptionName, subscriptionPredefined) =
extractSubscription(predefinedSubscription, tp)

// Only delete a subscription if it's not predefined and created by us
if (!subscriptionPredefined) {
try {
admin.topics().deleteSubscription(tp, s"$subscriptionName")
} catch {
case e: PulsarAdminException if e.getStatusCode == 404 =>
logInfo(s"Cannot remove cursor since the topic $tp has been deleted during execution.")
case e: Throwable =>
throw new RuntimeException(
s"Failed to remove cursor for ${TopicName.get(tp).toString}",
e)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ private[pulsar] object PulsarOptions {
val StartingTime = "startingtime"
val EndingOffsetsOptionKey = "endingoffsets"
val SubscriptionPrefix = "subscriptionprefix"
val PredefinedSubscription = "predefinedsubscription"

val PollTimeoutMS = "polltimeoutms"
val FailOnDataLossOptionKey = "failondataloss"
Expand Down
19 changes: 16 additions & 3 deletions src/main/scala/org/apache/spark/sql/pulsar/PulsarProvider.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ private[pulsar] class PulsarProvider
adminClientConfig,
subscriptionNamePrefix,
caseInsensitiveParams,
getAllowDifferentTopicSchemas(parameters))) { reader =>
getAllowDifferentTopicSchemas(parameters),
getPredefinedSubscription(parameters)
)) { reader =>
reader.getAndCheckCompatible(schema)
}
(shortName(), inferredSchema)
Expand All @@ -92,7 +94,8 @@ private[pulsar] class PulsarProvider
adminClientConfig,
subscriptionNamePrefix,
caseInsensitiveParams,
getAllowDifferentTopicSchemas(parameters))
getAllowDifferentTopicSchemas(parameters),
getPredefinedSubscription(parameters))

metadataReader.getAndCheckCompatible(schema)

Expand Down Expand Up @@ -133,7 +136,9 @@ private[pulsar] class PulsarProvider
adminClientConfig,
subscriptionNamePrefix,
caseInsensitiveParams,
getAllowDifferentTopicSchemas(parameters))) { reader =>
getAllowDifferentTopicSchemas(parameters),
getPredefinedSubscription(parameters)
)) { reader =>
val perTopicStarts = reader.startingOffsetForEachTopic(
caseInsensitiveParams,
EarliestOffset)
Expand Down Expand Up @@ -356,6 +361,14 @@ private[pulsar] object PulsarProvider extends Logging {
parameters.getOrElse(SubscriptionPrefix, s"$defaultPrefix-${UUID.randomUUID}")
}

private def getPredefinedSubscription(parameters: Map[String, String]): Option[String] = {
val sub = parameters.getOrElse(PredefinedSubscription, "")
sub match {
case "" => None
case s => Option(s)
}
}

private def getServiceUrl(parameters: Map[String, String]): String = {
parameters(ServiceUrlOptionKey)
}
Expand Down

0 comments on commit 8db4c9b

Please sign in to comment.