diff --git a/src/main/scala/org/apache/spark/sql/pulsar/PulsarMetadataReader.scala b/src/main/scala/org/apache/spark/sql/pulsar/PulsarMetadataReader.scala index de5dfe27..13dd2543 100644 --- a/src/main/scala/org/apache/spark/sql/pulsar/PulsarMetadataReader.scala +++ b/src/main/scala/org/apache/spark/sql/pulsar/PulsarMetadataReader.scala @@ -41,7 +41,8 @@ private[pulsar] case class PulsarMetadataReader( adminClientConf: ju.Map[String, Object], driverGroupIdPrefix: String, caseInsensitiveParameters: Map[String, String], - allowDifferentTopicSchemas: Boolean) + allowDifferentTopicSchemas: Boolean, + predefinedSubscription: Option[String]) extends Closeable with Logging { @@ -59,31 +60,39 @@ private[pulsar] case class PulsarMetadataReader( def setupCursor(startingPos: PerTopicOffset): Unit = { startingPos match { - case off: SpecificPulsarOffset => setupCursorByMid(off) - case time: SpecificPulsarStartingTime => setupCursorByTime(time) + case off: SpecificPulsarOffset => setupCursorByMid(off, predefinedSubscription) + case time: SpecificPulsarStartingTime => setupCursorByTime(time, predefinedSubscription) case s => throw new UnsupportedOperationException(s"$s shouldn't appear here, a bug occurs.") } } - def setupCursorByMid(offset: SpecificPulsarOffset): Unit = { + def setupCursorByMid(offset: SpecificPulsarOffset, subscription: Option[String]): Unit = { offset.topicOffsets.foreach { case (tp, mid) => val umid = mid.asInstanceOf[UserProvidedMessageId] - try { - admin.topics().createSubscription(tp, s"$driverGroupIdPrefix-$tp", umid.mid) - } catch { - case _: PulsarAdminException.ConflictException => - log.info("Subscription already exists, resetting the cursor to given offset") - admin.topics().resetCursor(tp, s"$driverGroupIdPrefix-$tp", umid.mid) - case e: Throwable => - throw new RuntimeException( - s"Failed to setup cursor for ${TopicName.get(tp).toString}", - e) + val (subscriptionName, subscriptionPredefined) = extractSubscription(subscription, tp) + + // setup the subscription + if (!subscriptionPredefined) { + try { + admin.topics().createSubscription(tp, subscriptionName, umid.mid) + } catch { + case _: PulsarAdminException.ConflictException => + // if subscription already exists, log the info and continue to reset cursor + log.info("Subscription already exists...") + case e: Throwable => + throw new RuntimeException( + s"Failed to setup cursor for ${TopicName.get(tp).toString}", e) + } } + + // reset cursor position + log.info(s"Resetting cursor for $subscriptionName to given offset") + admin.topics().resetCursor(tp, subscriptionName, umid.mid) } } - def setupCursorByTime(time: SpecificPulsarStartingTime): Unit = { + def setupCursorByTime(time: SpecificPulsarStartingTime, subscription: Option[String]): Unit = { time.topicTimes.foreach { case (tp, time) => val msgID = time match { @@ -93,30 +102,47 @@ private[pulsar] case class PulsarMetadataReader( case _ => throw new RuntimeException(s"Invalid starting time for $tp: $time") } + val (subscriptionNames, subscriptionPredefined) = extractSubscription(subscription, tp) + // setup the subscription - try { - admin.topics().createSubscription(tp, s"$driverGroupIdPrefix-$tp", msgID) - } catch { - case _: PulsarAdminException.ConflictException => - log.info("subscription already exists, resetting the cursor to given offset") - time match { - case PulsarProvider.EARLIEST_TIME | PulsarProvider.LATEST_TIME => - admin.topics().resetCursor(tp, s"$driverGroupIdPrefix-$tp", msgID) - case _ => - admin.topics().resetCursor(tp, s"$driverGroupIdPrefix-$tp", time) - } - case e: Throwable => - throw new RuntimeException( - s"Failed to setup cursor for ${TopicName.get(tp).toString}", e) + if (!subscriptionPredefined) { + try { + admin.topics().createSubscription(tp, s"$subscriptionNames", msgID) + } catch { + case _: PulsarAdminException.ConflictException => + // if subscription already exists, log the info and continue to reset cursor + log.info("subscription already exists...") + case e: Throwable => + throw new RuntimeException( + s"Failed to setup cursor for ${TopicName.get(tp).toString}", e) + } + } + + // reset cursor position + log.info(s"Resetting cursor for $subscriptionNames to given timestamp") + time match { + case PulsarProvider.EARLIEST_TIME | PulsarProvider.LATEST_TIME => + admin.topics().resetCursor(tp, s"$subscriptionNames", msgID) + case _ => + admin.topics().resetCursor(tp, s"$subscriptionNames", time) } } } + private def extractSubscription(subscriptionName: Option[String], + topicPartition: String): (String, Boolean) = { + subscriptionName match { + case None => (s"$driverGroupIdPrefix-$topicPartition", false) + case Some(subName) => (subName, true) + } + } + def commitCursorToOffset(offset: Map[String, MessageId]): Unit = { offset.foreach { case (tp, mid) => try { - admin.topics().resetCursor(tp, s"$driverGroupIdPrefix-$tp", mid) + val (subscription, _) = extractSubscription(predefinedSubscription, tp) + admin.topics().resetCursor(tp, s"$subscription", mid) } catch { case e: PulsarAdminException if e.getStatusCode == 404 || e.getStatusCode == 412 => logInfo( @@ -132,15 +158,21 @@ private[pulsar] case class PulsarMetadataReader( def removeCursor(): Unit = { getTopics() topics.foreach { tp => - try { - admin.topics().deleteSubscription(tp, s"$driverGroupIdPrefix-$tp") - } catch { - case e: PulsarAdminException if e.getStatusCode == 404 => - logInfo(s"Cannot remove cursor since the topic $tp has been deleted during execution.") - case e: Throwable => - throw new RuntimeException( - s"Failed to remove cursor for ${TopicName.get(tp).toString}", - e) + val (subscriptionName, subscriptionPredefined) = + extractSubscription(predefinedSubscription, tp) + + // Only delete a subscription if it's not predefined and created by us + if (!subscriptionPredefined) { + try { + admin.topics().deleteSubscription(tp, s"$subscriptionName") + } catch { + case e: PulsarAdminException if e.getStatusCode == 404 => + logInfo(s"Cannot remove cursor since the topic $tp has been deleted during execution.") + case e: Throwable => + throw new RuntimeException( + s"Failed to remove cursor for ${TopicName.get(tp).toString}", + e) + } } } } diff --git a/src/main/scala/org/apache/spark/sql/pulsar/PulsarOptions.scala b/src/main/scala/org/apache/spark/sql/pulsar/PulsarOptions.scala index 521eb37c..16e3db1b 100644 --- a/src/main/scala/org/apache/spark/sql/pulsar/PulsarOptions.scala +++ b/src/main/scala/org/apache/spark/sql/pulsar/PulsarOptions.scala @@ -45,6 +45,7 @@ private[pulsar] object PulsarOptions { val StartingTime = "startingtime" val EndingOffsetsOptionKey = "endingoffsets" val SubscriptionPrefix = "subscriptionprefix" + val PredefinedSubscription = "predefinedsubscription" val PollTimeoutMS = "polltimeoutms" val FailOnDataLossOptionKey = "failondataloss" diff --git a/src/main/scala/org/apache/spark/sql/pulsar/PulsarProvider.scala b/src/main/scala/org/apache/spark/sql/pulsar/PulsarProvider.scala index 52fbc965..b4067542 100644 --- a/src/main/scala/org/apache/spark/sql/pulsar/PulsarProvider.scala +++ b/src/main/scala/org/apache/spark/sql/pulsar/PulsarProvider.scala @@ -68,7 +68,9 @@ private[pulsar] class PulsarProvider adminClientConfig, subscriptionNamePrefix, caseInsensitiveParams, - getAllowDifferentTopicSchemas(parameters))) { reader => + getAllowDifferentTopicSchemas(parameters), + getPredefinedSubscription(parameters) + )) { reader => reader.getAndCheckCompatible(schema) } (shortName(), inferredSchema) @@ -92,7 +94,8 @@ private[pulsar] class PulsarProvider adminClientConfig, subscriptionNamePrefix, caseInsensitiveParams, - getAllowDifferentTopicSchemas(parameters)) + getAllowDifferentTopicSchemas(parameters), + getPredefinedSubscription(parameters)) metadataReader.getAndCheckCompatible(schema) @@ -133,7 +136,9 @@ private[pulsar] class PulsarProvider adminClientConfig, subscriptionNamePrefix, caseInsensitiveParams, - getAllowDifferentTopicSchemas(parameters))) { reader => + getAllowDifferentTopicSchemas(parameters), + getPredefinedSubscription(parameters) + )) { reader => val perTopicStarts = reader.startingOffsetForEachTopic( caseInsensitiveParams, EarliestOffset) @@ -356,6 +361,14 @@ private[pulsar] object PulsarProvider extends Logging { parameters.getOrElse(SubscriptionPrefix, s"$defaultPrefix-${UUID.randomUUID}") } + private def getPredefinedSubscription(parameters: Map[String, String]): Option[String] = { + val sub = parameters.getOrElse(PredefinedSubscription, "") + sub match { + case "" => None + case s => Option(s) + } + } + private def getServiceUrl(parameters: Map[String, String]): String = { parameters(ServiceUrlOptionKey) }